-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Feat: Add Batched Video Encoding for Faster Dataset Recording #1390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: Xingdong Zuo <[email protected]>
for more information, see https://pre-commit.ci
Thank you for addressing this, it's been a pain point for too. |
Signed-off-by: Xingdong Zuo <[email protected]>
Thanks so much @jackvial |
# Update video info in metadata | ||
self.meta.update_video_info() | ||
|
||
# Clean up image directories after successful encoding | ||
img_dir = self.root / "images" | ||
if img_dir.is_dir(): | ||
shutil.rmtree(img_dir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it would be clearer to put update_video_info
and the raw images cleaning in encode_episode_videos
directly instead of having them spreaded over save_episode
(no batch) and batch_encode_episode
(batch). Wdyt ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks very much Caroline for sharing your opinion!
I agree with you that it's cleaner to put update video info + raw image cleanup in encode_episode_videos
I've tried to reflect it in 2085da0#diff-5c4793c216dd4a0a27f823ad9abf84afffca677ac2af404b21b6ec77bbdb12e8L950-L968
Please feel free to share your thoughts on this change 😉
lerobot/record.py
Outdated
@@ -324,9 +332,33 @@ def record(cfg: RecordConfig) -> LeRobotDataset: | |||
dataset.clear_episode_buffer() | |||
continue | |||
|
|||
dataset.save_episode() | |||
# Save episode with or without video encoding based on batch size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need to add a failsafe for when the teleoperation crashes in the middle of a batch. I tested on my side, and if teleoperation fails at the middle of a batch, the already recorded episodes will not be encoded as videos.
Following the decorator approach of safe_stop_image_writer
should do the trick !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, huge thanks for pointing out this case, I missed it out :)
I implemented a VideoEncodingManager
to handle this in b029966
Hi @zuoxingdong, Thank you for this PR that addresses a recurrent issue from community members ! I left a few reviews on your suggestions, out of which 2 imply larger changes. Let me know if you need further details or help to implement these modifications. Thanks again for your contribution ! Best, Caroline. |
Interesting, I hadn't thought of approaching recording like that. Your batching approach will work for both of the workflows described above 🎉 And it's good that the default behavior remains the same as it is now |
for more information, see https://pre-commit.ci
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for this iteration ! I feel like you're getting closer 👀
I added a bunch of new comments, but I realize this might be a lot of work. Let me know if I can help you in any way !
Of course I'm open to discussion on all my comments, my ideas are certainly not the best ;)
Best,
Caroline.
@@ -957,8 +988,53 @@ def encode_episode_videos(self, episode_index: int) -> dict: | |||
).parent | |||
encode_video_frames(img_dir, video_path, self.fps, overwrite=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can put
if cleanup_images:
shutil.rmtree(img_dir)
right here to avoid building img_dir
once again. It is safe because encode_video_frames
raises an error if the video is not correctly created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good idea!
Just to make sure if I understood properly:
...
img_dir = self._get_image_file_path(
episode_index=episode_index, image_key=key, frame_index=0
).parent
encode_video_frames(img_dir, video_path, self.fps, overwrite=True)
if cleanup_images: # <<< - ADD
shutil.rmtree(img_dir)
# Update video info
if update_video_info and len(self.meta.video_keys) > 0:
self.meta.update_video_info()
# REMOVE!
# # Cleanup raw images for this episode
# if cleanup_images:
# for key in self.meta.video_keys:
# img_dir = self._get_image_file_path(
# episode_index=episode_index, image_key=key, frame_index=0
# ).parent
# if img_dir.exists():
# shutil.rmtree(img_dir)
Is this what you meant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly ! I realize now that was not super clear, thanks for deciphering it out ;)
@@ -957,8 +988,53 @@ def encode_episode_videos(self, episode_index: int) -> dict: | |||
).parent | |||
encode_video_frames(img_dir, video_path, self.fps, overwrite=True) | |||
|
|||
# Update video info | |||
if update_video_info and len(self.meta.video_keys) > 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this : self.meta.update_video_info()
actually only reads the first episode videos to infer meta data. Maybe we should only call it when the first episode is modified/encoded ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot @CarolinePascal for bringing it up and sharing your opinion!
It helps me much to understand the workflow better 😉
If my understanding is correct, as you mentioned above, update_video_info
always reads from episode 0 to get video metadata while assuming all videos are encoded the same way.
Currently:
update_video_info
is called for every episode whenupdate_video_info=True
New way:
- Call
update_video_info
only ONCE whenepisode_index == 0
# Update video info (only needed when first episode is encoded since it reads from episode 0)
if update_video_info and len(self.meta.video_keys) > 0 and episode_index == 0:
self.meta.update_video_info()
In addition, I feel like we do not need update_video_info
argument anymore as it is only called on first episode (after encoding).
How about removing it:
def encode_episode_videos(
self, episode_index: int, cleanup_images: bool = True
) -> dict:
...
if len(self.meta.video_keys) > 0 and episode_index == 0:
self.meta.update_video_info()
...
def save_episode(self, episode_data: dict | None = None, encode_videos: bool = True) -> None:
...
# Only encode videos if requested
if encode_videos and len(self.meta.video_keys) > 0:
video_paths = self.encode_episode_videos(episode_index, cleanup_images=False)
...
def batch_encode_videos(self, start_episode: int = 0, end_episode: int | None = None) -> None:
...
# Encode all episodes but don't update video info or cleanup until the end
for ep_idx in range(start_episode, end_episode):
logging.info(f"Encoding videos for episode {ep_idx}")
self.encode_episode_videos(
ep_idx,
cleanup_images=False,
)
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the suggestion, it is way more efficient that way - No more rewriting the same thing at each new video encoding ! Plus, if update_video_info()
was to change in the future, we would just need to remove the episode_index == 0
condition - easy ;)
) | ||
|
||
# Update video info once for the entire batch | ||
self.meta.update_video_info() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See previous comment on self.meta.update_video_info()
: we could rather set update_video_info=True
in self.encode_episode_videos
which would then only be triggered on the first episode encoding and then never again.
Open to discussions on that topic !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
# Update video info once for the entire batch | ||
self.meta.update_video_info() | ||
|
||
# Cleanup all images once for the entire batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assumes that the batch covers all existing episodes, which might not be the case depending on the user usage of batch_encode_videos
. I think we could stick to setting cleanup_images=True
in self.encode_episode_videos
, check for remaining PNG
files when the recording is over, and then remove the images
folder if there are no images left !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's a good point!
Your method is safer!
It prevents losing unprocessed episodes for some cases.
New:
- Set
cleanup_images=True
in theencode_episode_videos
call, so that each episode cleans up its own images after encoding - After the batch encoding is complete, only remove the entire images folder if it's empty or contains no PNG files.
# Encode all episodes with cleanup enabled for individual episodes
for ep_idx in range(start_episode, end_episode):
logging.info(f"Encoding videos for episode {ep_idx}")
self.encode_episode_videos(
ep_idx,
cleanup_images=True,
)
img_dir = self.root / "images"
if img_dir.is_dir():
# Check for any remaining PNG files
png_files = list(img_dir.rglob("*.png"))
if not png_files:
# Only remove the images directory if no PNG files remain
shutil.rmtree(img_dir)
logging.info("Cleaned up empty images directory")
else:
logging.info(f"Images directory is not empty, containing {len(png_files)} PNG files")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks ! I will run some test to see if it works correctly on my side ;)
@@ -237,6 +240,75 @@ def record_loop( | |||
timestamp = time.perf_counter() - start_episode_t | |||
|
|||
|
|||
class VideoEncodingManager: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow ! This is a very good safety net ! I did not expect that much when mentioning it ;) However, I feel like we are adding another layer to an already very layered code...
Do you think you could make it work with only the __exit__
method and the previous simple if use_batched_encoding: ... else:
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your warm words Caroline! Yeah, I think so it's always better to strive for simplicity lol~
How about following:
class VideoEncodingManager:
def __init__(self, dataset, batch_size, play_sounds=True):
self.dataset = dataset
self.batch_size = batch_size
self.use_batched_encoding = batch_size > 1
self.episodes_since_last_encoding = 0
self.last_encoded_episode = dataset.num_episodes - 1
self.play_sounds = play_sounds
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.use_batched_encoding and self.episodes_since_last_encoding > 0:
if exc_type is not None:
print("Exception occurred. Encoding pending episodes before exit...")
else:
print("Recording stopped. Encoding remaining episodes...")
try:
start_ep = self.last_encoded_episode + 1
end_ep = self.dataset.num_episodes
log_say(
f"Batch encoding videos for {self.episodes_since_last_encoding} episodes, "
f"from episode {start_ep} to episode {end_ep - 1}",
self.play_sounds,
)
self.dataset.batch_encode_videos(start_ep, end_ep)
except Exception as e:
print(f"Something went wrong while encoding videos on exit: {e}")
return False # Don't suppress the original exception
@parser.wrap()
def record(cfg: RecordConfig) -> LeRobotDataset:
...
with VideoEncodingManager(
dataset, cfg.dataset.video_encoding_batch_size, cfg.play_sounds
) as video_encoding_manager:
recorded_episodes = 0
while recorded_episodes < cfg.dataset.num_episodes and not events["stop_recording"]:
log_say(f"Recording episode {dataset.num_episodes}", cfg.play_sounds)
record_loop(
...
)
...
if video_encoding_manager.use_batched_encoding:
dataset.save_episode(encode_videos=False)
video_encoding_manager.episodes_since_last_encoding += 1
# Check if we should encode current batch
if video_encoding_manager.episodes_since_last_encoding >= video_encoding_manager.batch_size:
start_ep = video_encoding_manager.last_encoded_episode + 1
end_ep = dataset.num_episodes
log_say(
f"Batch encoding videos for {video_encoding_manager.episodes_since_last_encoding} episodes, "
f"from episode {start_ep} to episode {end_ep - 1}",
cfg.play_sounds,
)
dataset.batch_encode_videos(start_ep, end_ep)
video_encoding_manager.last_encoded_episode = end_ep - 1
video_encoding_manager.episodes_since_last_encoding = 0
else:
# Default behavior: encode videos immediately
dataset.save_episode(encode_videos=True)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An simplicity often bings the most durable solutions ! This version looks really good to me, I'll do some testing as well to see if it works correctly.
Hi @zuoxingdong, Thank you again for your contribution ! I feel like we're getting close to a merge now, as I have no more comments on your code ;) I still need to run some tests on my side (see if the meta data are currently uploaded, the raw PNG files correctly deleted if need be and the exception case correctly handled). I'll let you know when it's all done ! Best, Caroline. |
Motivation
During the Seoul Local Hackathon, we experienced that video encoding is a significant bottleneck in the teleoperation recording workflow.
After each episode, we have to wait quite a while for video encoding to complete before proceeding to the next episode.
Depending on the hardware confugrations, this can be time-consuming (tiring) with such delays.
To enhance UX for teleoperator, this PR introduces (optional) batched video encoding.
When used, instead of encoding videos immediately after each episode, now we could consecutively collect multiple episodes and then encode videos in batches.
This would help alleviating time delays between episodes.
NOTE: the main goal is to improve the human teleoperator experience rather than raw wallclock speedup.
During batched encoding, teleoperators could use this time more productively, e.g. rest, documentation, or planning subsequent demonstrations.
Current workflow:
New workflow with batching (batch size = 10):
Key changes
Added a new configuration option
video_encoding_batch_size
Added
batch_encode_videos()
methodAdapted
record()
with defaultvideo_encoding_batch_size = 1
Usage
Immediate Encoding (Current)
python -m lerobot.record \ --robot.type=so101_follower \ ... --dataset.video_encoding_batch_size=1 # Default
Batched Encoding (New Feature)
python -m lerobot.record \ --robot.type=so101_follower \ ... --dataset.video_encoding_batch_size=10 # Batch every 10 episodes
cc:
@aliberts
@CarolinePascal
@fracapuano