-
-
Notifications
You must be signed in to change notification settings - Fork 2.3k
Add support for animated WebP files #2761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
6e47661
482d803
cd12a48
356da80
b46cf52
e534991
c18d26b
80b9624
247c2f5
5b2dd29
b3e90a3
405d1a6
acc4334
c5e6211
e32fb4f
c9258d6
e75c386
8e207d5
28bec69
b3565d8
cf31e70
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,32 +28,254 @@ class WebPImageFile(ImageFile.ImageFile): | |
format_description = "WebP image" | ||
|
||
def _open(self): | ||
data, width, height, self.mode, icc_profile, exif = \ | ||
_webp.WebPDecode(self.fp.read()) | ||
if not _webp.HAVE_WEBPMUX: | ||
# Legacy mode | ||
data, width, height, self.mode = _webp.WebPDecode(self.fp.read()) | ||
self.size = width, height | ||
self.fp = BytesIO(data) | ||
self.tile = [("raw", (0, 0) + self.size, 0, self.mode)] | ||
return | ||
|
||
# Use the newer AnimDecoder API to parse the (possibly) animated file, | ||
# and access muxed chunks like ICC/EXIF/XMP. | ||
self._decoder = _webp.WebPAnimDecoder(self.fp.read()) | ||
|
||
# Get info from decoder | ||
width, height, loop_count, bgcolor, frame_count, mode = self._decoder.get_info() | ||
self.size = width, height | ||
self.info["loop"] = loop_count | ||
bg_a, bg_r, bg_g, bg_b = \ | ||
(bgcolor >> 24) & 0xFF, \ | ||
(bgcolor >> 16) & 0xFF, \ | ||
(bgcolor >> 8) & 0xFF, \ | ||
bgcolor & 0xFF | ||
self.info["background"] = (bg_r, bg_g, bg_b, bg_a) | ||
self._n_frames = frame_count | ||
self.mode = mode | ||
self.tile = [] | ||
|
||
# Attempt to read ICC / EXIF / XMP chunks from file | ||
icc_profile = self._decoder.get_chunk("ICCP") | ||
exif = self._decoder.get_chunk("EXIF") | ||
xmp = self._decoder.get_chunk("XMP ") | ||
if icc_profile: | ||
self.info["icc_profile"] = icc_profile | ||
if exif: | ||
self.info["exif"] = exif | ||
if xmp: | ||
self.info["xmp"] = xmp | ||
|
||
self.size = width, height | ||
self.fp = BytesIO(data) | ||
self.tile = [("raw", (0, 0) + self.size, 0, self.mode)] | ||
# Initialize seek state | ||
self._reset(reset=False) | ||
self.seek(0) | ||
|
||
def _getexif(self): | ||
from .JpegImagePlugin import _getexif | ||
return _getexif(self) | ||
|
||
@property | ||
def n_frames(self): | ||
if not _webp.HAVE_WEBPMUX: | ||
return 1 | ||
return self._n_frames | ||
|
||
def _save(im, fp, filename): | ||
image_mode = im.mode | ||
if im.mode not in _VALID_WEBP_MODES: | ||
raise IOError("cannot write mode %s as WEBP" % image_mode) | ||
@property | ||
def is_animated(self): | ||
if not _webp.HAVE_WEBPMUX: | ||
return False | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ... and also these two lines can be removed. |
||
return self._n_frames > 1 | ||
|
||
def seek(self, frame): | ||
if not _webp.HAVE_WEBPMUX: | ||
return super(WebPImageFile, self).seek(frame) | ||
|
||
# Perform some simple checks first | ||
if frame >= self._n_frames: | ||
raise EOFError("attempted to seek beyond end of sequence") | ||
if frame < 0: | ||
raise EOFError("negative frame index is not valid") | ||
|
||
# Set logical frame to requested position | ||
self.__logical_frame = frame | ||
|
||
def _reset(self, reset=True): | ||
if reset: | ||
self._decoder.reset() | ||
self.__physical_frame = 0 | ||
self.__loaded = -1 | ||
self.__timestamp = 0 | ||
|
||
def _get_next(self): | ||
# Get next frame | ||
ret = self._decoder.get_next() | ||
self.__physical_frame += 1 | ||
|
||
# Check if an error occurred | ||
if ret is None: | ||
self._reset() # Reset just to be safe | ||
self.seek(0) | ||
raise EOFError("failed to decode next frame in WebP file") | ||
|
||
# Compute duration | ||
data, timestamp = ret | ||
duration = timestamp - self.__timestamp | ||
self.__timestamp = timestamp | ||
timestamp -= duration # libwebp gives frame end, adjust to start of frame | ||
return data, timestamp, duration | ||
|
||
def _seek(self, frame): | ||
if self.__physical_frame == frame: | ||
return # Nothing to do | ||
|
||
if frame < self.__physical_frame: | ||
# Rewind to beginning | ||
self._reset() | ||
|
||
# Advance to the requested frame | ||
while self.__physical_frame < frame: | ||
self._get_next() | ||
|
||
def load(self): | ||
if _webp.HAVE_WEBPMUX: | ||
if self.__loaded != self.__logical_frame: | ||
self._seek(self.__logical_frame) | ||
|
||
# We need to load the image data for this frame | ||
data, timestamp, duration = self._get_next() | ||
self.info["timestamp"] = timestamp | ||
self.info["duration"] = duration | ||
self.__loaded = self.__logical_frame | ||
|
||
# Set tile | ||
self.fp = BytesIO(data) | ||
self.tile = [("raw", (0, 0) + self.size, 0, self.mode)] | ||
|
||
return super(WebPImageFile, self).load() | ||
|
||
def tell(self): | ||
if not _webp.HAVE_WEBPMUX: | ||
return super(WebPImageFile, self).tell() | ||
|
||
return self.__logical_frame | ||
|
||
def _save_all(im, fp, filename): | ||
encoderinfo = im.encoderinfo.copy() | ||
append_images = encoderinfo.get("append_images", []) | ||
|
||
# If total frame count is 1, then save using the legacy API, which | ||
# will preserve non-alpha modes | ||
total = 0 | ||
for ims in [im]+append_images: | ||
total += 1 if not hasattr(ims, "n_frames") else ims.n_frames | ||
if total == 1: | ||
_save(im, fp, filename) | ||
return | ||
|
||
background = encoderinfo.get("background", (0, 0, 0, 0)) | ||
duration = im.encoderinfo.get("duration", 0) | ||
loop = im.encoderinfo.get("loop", 0) | ||
minimize_size = im.encoderinfo.get("minimize_size", False) | ||
kmin = im.encoderinfo.get("kmin", None) | ||
kmax = im.encoderinfo.get("kmax", None) | ||
allow_mixed = im.encoderinfo.get("allow_mixed", False) | ||
verbose = False | ||
lossless = im.encoderinfo.get("lossless", False) | ||
quality = im.encoderinfo.get("quality", 80) | ||
method = im.encoderinfo.get("method", 0) | ||
icc_profile = im.encoderinfo.get("icc_profile", "") | ||
exif = im.encoderinfo.get("exif", "") | ||
xmp = im.encoderinfo.get("xmp", "") | ||
if allow_mixed: | ||
lossless = False | ||
|
||
# Sensible keyframe defaults are from gif2webp.c script | ||
if kmin is None: | ||
kmin = 9 if lossless else 3 | ||
if kmax is None: | ||
kmax = 17 if lossless else 5 | ||
|
||
# Validate background color | ||
if (not isinstance(background, (list, tuple)) or len(background) != 4 or | ||
not all(v >= 0 and v < 256 for v in background)): | ||
raise IOError("Background color is not an RGBA tuple clamped to (0-255): %s" % str(background)) | ||
bg_r, bg_g, bg_b, bg_a = background | ||
background = (bg_a << 24) | (bg_r << 16) | (bg_g << 8) | (bg_b << 0) # Convert to packed uint | ||
|
||
# Setup the WebP animation encoder | ||
enc = _webp.WebPAnimEncoder( | ||
im.size[0], im.size[1], | ||
background, | ||
loop, | ||
minimize_size, | ||
kmin, kmax, | ||
allow_mixed, | ||
verbose | ||
) | ||
|
||
# Add each frame | ||
frame_idx = 0 | ||
timestamp = 0 | ||
cur_idx = im.tell() | ||
try: | ||
for ims in [im]+append_images: | ||
# Get # of frames in this image | ||
if not hasattr(ims, "n_frames"): | ||
nfr = 1 | ||
else: | ||
nfr = ims.n_frames | ||
|
||
for idx in range(nfr): | ||
ims.seek(idx) | ||
ims.load() | ||
|
||
# Make sure image mode is supported | ||
frame = ims | ||
if not ims.mode in _VALID_WEBP_MODES: | ||
alpha = ims.mode == 'P' and 'A' in ims.im.getpalettemode() | ||
frame = ims.convert('RGBA' if alpha else 'RGB') | ||
|
||
# Append the frame to the animation encoder | ||
enc.add( | ||
frame.tobytes(), | ||
timestamp, | ||
frame.size[0], frame.size[1], | ||
frame.mode, | ||
lossless, | ||
quality, | ||
method | ||
) | ||
|
||
# Update timestamp and frame index | ||
timestamp += duration[frame_idx] if isinstance(duration, (list, tuple)) else duration | ||
frame_idx += 1 | ||
|
||
finally: | ||
im.seek(cur_idx) | ||
|
||
# Force encoder to flush frames | ||
enc.add( | ||
None, | ||
timestamp, | ||
0, 0, "", lossless, quality, 0 | ||
) | ||
|
||
# Get the final output from the encoder | ||
data = enc.assemble(icc_profile, exif, xmp) | ||
if data is None: | ||
raise IOError("cannot write file as WEBP (encoder returned None)") | ||
|
||
fp.write(data) | ||
|
||
def _save(im, fp, filename): | ||
lossless = im.encoderinfo.get("lossless", False) | ||
quality = im.encoderinfo.get("quality", 80) | ||
icc_profile = im.encoderinfo.get("icc_profile", "") | ||
exif = im.encoderinfo.get("exif", "") | ||
xmp = im.encoderinfo.get("xmp", "") | ||
|
||
if im.mode not in _VALID_WEBP_MODES: | ||
alpha = im.mode == 'P' and 'A' in im.im.getpalettemode() | ||
im = im.convert('RGBA' if alpha else 'RGB') | ||
|
||
data = _webp.WebPEncode( | ||
im.tobytes(), | ||
|
@@ -63,7 +285,8 @@ def _save(im, fp, filename): | |
float(quality), | ||
im.mode, | ||
icc_profile, | ||
exif | ||
exif, | ||
xmp | ||
) | ||
if data is None: | ||
raise IOError("cannot write file as WEBP (encoder returned None)") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WEBP -> WebP |
||
|
@@ -73,6 +296,7 @@ def _save(im, fp, filename): | |
|
||
Image.register_open(WebPImageFile.format, WebPImageFile, _accept) | ||
Image.register_save(WebPImageFile.format, _save) | ||
|
||
if _webp.HAVE_WEBPMUX: | ||
Image.register_save_all(WebPImageFile.format, _save_all) | ||
Image.register_extension(WebPImageFile.format, ".webp") | ||
Image.register_mime(WebPImageFile.format, "image/webp") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,10 @@ def test_version(self): | |
_webp.WebPDecoderBuggyAlpha() | ||
|
||
def test_read_rgb(self): | ||
""" | ||
Can we read a RGB mode webp file without error. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we read a RGB mode webp file without error. |
||
Does it have the bits we expect? | ||
""" | ||
|
||
file_path = "Tests/images/hopper.webp" | ||
image = Image.open(file_path) | ||
|
@@ -46,9 +50,7 @@ def test_write_rgb(self): | |
temp_file = self.tempfile("temp.webp") | ||
|
||
hopper("RGB").save(temp_file) | ||
|
||
image = Image.open(temp_file) | ||
image.load() | ||
|
||
self.assertEqual(image.mode, "RGB") | ||
self.assertEqual(image.size, (128, 128)) | ||
|
@@ -70,19 +72,66 @@ def test_write_rgb(self): | |
# the image. The old lena images for WebP are showing ~16 on | ||
# Ubuntu, the jpegs are showing ~18. | ||
target = hopper("RGB") | ||
self.assert_image_similar(image, target, 12) | ||
self.assert_image_similar(image, target, 12.0) | ||
|
||
def test_write_unsupported_mode_L(self): | ||
""" | ||
Saving a black-and-white file to webp format should work, and be | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. webp -> WebP |
||
similar to the original file. | ||
""" | ||
|
||
def test_write_unsupported_mode(self): | ||
temp_file = self.tempfile("temp.webp") | ||
hopper("L").save(temp_file) | ||
image = Image.open(temp_file) | ||
|
||
im = hopper("L") | ||
self.assertRaises(IOError, im.save, temp_file) | ||
self.assertEqual(image.mode, "RGB") | ||
self.assertEqual(image.size, (128, 128)) | ||
self.assertEqual(image.format, "WEBP") | ||
|
||
image.load() | ||
image.getdata() | ||
target = hopper("L").convert("RGB") | ||
|
||
self.assert_image_similar(image, target, 10.0) | ||
|
||
def test_write_unsupported_mode_P(self): | ||
""" | ||
Saving a palette-based file to webp format should work, and be | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. webp -> WebP |
||
similar to the original file. | ||
""" | ||
|
||
temp_file = self.tempfile("temp.webp") | ||
hopper("P").save(temp_file) | ||
image = Image.open(temp_file) | ||
|
||
self.assertEqual(image.mode, "RGB") | ||
self.assertEqual(image.size, (128, 128)) | ||
self.assertEqual(image.format, "WEBP") | ||
|
||
image.load() | ||
image.getdata() | ||
target = hopper("P").convert("RGB") | ||
|
||
self.assert_image_similar(image, target, 50.0) | ||
|
||
def test_WebPEncode_with_invalid_args(self): | ||
""" | ||
Calling encoder functions with no arguments should result in an error. | ||
""" | ||
|
||
if _webp.HAVE_WEBPMUX: | ||
self.assertRaises(TypeError, _webp.WebPAnimEncoder) | ||
self.assertRaises(TypeError, _webp.WebPEncode) | ||
|
||
def test_WebPDecode_with_invalid_args(self): | ||
""" | ||
Calling decoder functions with no arguments should result in an error. | ||
""" | ||
|
||
if _webp.HAVE_WEBPMUX: | ||
self.assertRaises(TypeError, _webp.WebPAnimDecoder) | ||
self.assertRaises(TypeError, _webp.WebPDecode) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about setting
self._n_frames = 1
in_open
whennot _webp.HAVE_WEBPANIM
?Then these two lines can be removed...