|
1 |
| -import unittest |
| 1 | +import codecs |
| 2 | +import contextlib |
| 3 | +import io |
| 4 | +import re |
2 | 5 | import sys
|
| 6 | +import unittest |
| 7 | +import unittest.mock as mock |
| 8 | +import _testcapi |
3 | 9 | from test.support import import_helper
|
4 | 10 |
|
5 | 11 | _testlimitedcapi = import_helper.import_module('_testlimitedcapi')
|
6 | 12 |
|
7 | 13 | NULL = None
|
| 14 | +BAD_ARGUMENT = re.escape('bad argument type for built-in operation') |
8 | 15 |
|
9 | 16 |
|
10 |
| -class CAPITest(unittest.TestCase): |
| 17 | +class CAPIUnicodeTest(unittest.TestCase): |
11 | 18 | # TODO: Test the following functions:
|
12 | 19 | #
|
13 | 20 | # PyUnicode_BuildEncodingMap
|
@@ -516,5 +523,291 @@ def test_asrawunicodeescapestring(self):
|
516 | 523 | # CRASHES asrawunicodeescapestring(NULL)
|
517 | 524 |
|
518 | 525 |
|
| 526 | +class CAPICodecs(unittest.TestCase): |
| 527 | + |
| 528 | + def setUp(self): |
| 529 | + # Encoding names are normalized internally by converting them |
| 530 | + # to lowercase and their hyphens are replaced by underscores. |
| 531 | + self.encoding_name = 'test.test_capi.test_codecs.codec_reversed' |
| 532 | + # Make sure that our custom codec is not already registered (that |
| 533 | + # way we know whether we correctly unregistered the custom codec |
| 534 | + # after a test or not). |
| 535 | + self.assertRaises(LookupError, codecs.lookup, self.encoding_name) |
| 536 | + # create the search function without registering yet |
| 537 | + self._create_custom_codec() |
| 538 | + |
| 539 | + def _create_custom_codec(self): |
| 540 | + def codec_encoder(m, errors='strict'): |
| 541 | + return (type(m)().join(reversed(m)), len(m)) |
| 542 | + |
| 543 | + def codec_decoder(c, errors='strict'): |
| 544 | + return (type(c)().join(reversed(c)), len(c)) |
| 545 | + |
| 546 | + class IncrementalEncoder(codecs.IncrementalEncoder): |
| 547 | + def encode(self, input, final=False): |
| 548 | + return codec_encoder(input) |
| 549 | + |
| 550 | + class IncrementalDecoder(codecs.IncrementalDecoder): |
| 551 | + def decode(self, input, final=False): |
| 552 | + return codec_decoder(input) |
| 553 | + |
| 554 | + class StreamReader(codecs.StreamReader): |
| 555 | + def encode(self, input, errors='strict'): |
| 556 | + return codec_encoder(input, errors=errors) |
| 557 | + |
| 558 | + def decode(self, input, errors='strict'): |
| 559 | + return codec_decoder(input, errors=errors) |
| 560 | + |
| 561 | + class StreamWriter(codecs.StreamWriter): |
| 562 | + def encode(self, input, errors='strict'): |
| 563 | + return codec_encoder(input, errors=errors) |
| 564 | + |
| 565 | + def decode(self, input, errors='strict'): |
| 566 | + return codec_decoder(input, errors=errors) |
| 567 | + |
| 568 | + info = codecs.CodecInfo( |
| 569 | + encode=codec_encoder, |
| 570 | + decode=codec_decoder, |
| 571 | + streamreader=StreamReader, |
| 572 | + streamwriter=StreamWriter, |
| 573 | + incrementalencoder=IncrementalEncoder, |
| 574 | + incrementaldecoder=IncrementalDecoder, |
| 575 | + name=self.encoding_name |
| 576 | + ) |
| 577 | + |
| 578 | + def search_function(encoding): |
| 579 | + if encoding == self.encoding_name: |
| 580 | + return info |
| 581 | + return None |
| 582 | + |
| 583 | + self.codec_info = info |
| 584 | + self.search_function = search_function |
| 585 | + |
| 586 | + @contextlib.contextmanager |
| 587 | + def use_custom_encoder(self): |
| 588 | + self.assertRaises(LookupError, codecs.lookup, self.encoding_name) |
| 589 | + codecs.register(self.search_function) |
| 590 | + yield |
| 591 | + codecs.unregister(self.search_function) |
| 592 | + self.assertRaises(LookupError, codecs.lookup, self.encoding_name) |
| 593 | + |
| 594 | + def test_codec_register(self): |
| 595 | + search_function, encoding = self.search_function, self.encoding_name |
| 596 | + # register the search function using the C API |
| 597 | + self.assertIsNone(_testcapi.codec_register(search_function)) |
| 598 | + # in case the test failed before cleaning up |
| 599 | + self.addCleanup(codecs.unregister, self.search_function) |
| 600 | + self.assertIs(codecs.lookup(encoding), search_function(encoding)) |
| 601 | + self.assertEqual(codecs.encode('123', encoding=encoding), '321') |
| 602 | + # unregister the search function using the regular API |
| 603 | + codecs.unregister(search_function) |
| 604 | + self.assertRaises(LookupError, codecs.lookup, encoding) |
| 605 | + |
| 606 | + def test_codec_unregister(self): |
| 607 | + search_function, encoding = self.search_function, self.encoding_name |
| 608 | + self.assertRaises(LookupError, codecs.lookup, encoding) |
| 609 | + # register the search function using the regular API |
| 610 | + codecs.register(search_function) |
| 611 | + # in case the test failed before cleaning up |
| 612 | + self.addCleanup(codecs.unregister, self.search_function) |
| 613 | + self.assertIsNotNone(codecs.lookup(encoding)) |
| 614 | + # unregister the search function using the C API |
| 615 | + self.assertIsNone(_testcapi.codec_unregister(search_function)) |
| 616 | + self.assertRaises(LookupError, codecs.lookup, encoding) |
| 617 | + |
| 618 | + def test_codec_known_encoding(self): |
| 619 | + self.assertRaises(LookupError, codecs.lookup, 'unknown-codec') |
| 620 | + self.assertFalse(_testcapi.codec_known_encoding('unknown-codec')) |
| 621 | + self.assertFalse(_testcapi.codec_known_encoding('unknown_codec')) |
| 622 | + self.assertFalse(_testcapi.codec_known_encoding('UNKNOWN-codec')) |
| 623 | + |
| 624 | + encoding_name = self.encoding_name |
| 625 | + self.assertRaises(LookupError, codecs.lookup, encoding_name) |
| 626 | + |
| 627 | + codecs.register(self.search_function) |
| 628 | + self.addCleanup(codecs.unregister, self.search_function) |
| 629 | + |
| 630 | + for name in [ |
| 631 | + encoding_name, |
| 632 | + encoding_name.upper(), |
| 633 | + encoding_name.replace('_', '-'), |
| 634 | + ]: |
| 635 | + with self.subTest(name): |
| 636 | + self.assertTrue(_testcapi.codec_known_encoding(name)) |
| 637 | + |
| 638 | + def test_codec_encode(self): |
| 639 | + encode = _testcapi.codec_encode |
| 640 | + self.assertEqual(encode('a', 'utf-8', NULL), b'a') |
| 641 | + self.assertEqual(encode('a', 'utf-8', 'strict'), b'a') |
| 642 | + self.assertEqual(encode('[é]', 'ascii', 'ignore'), b'[]') |
| 643 | + |
| 644 | + self.assertRaises(TypeError, encode, NULL, 'ascii', 'strict') |
| 645 | + with self.assertRaisesRegex(TypeError, BAD_ARGUMENT): |
| 646 | + encode('a', NULL, 'strict') |
| 647 | + |
| 648 | + def test_codec_decode(self): |
| 649 | + decode = _testcapi.codec_decode |
| 650 | + |
| 651 | + s = 'a\xa1\u4f60\U0001f600' |
| 652 | + b = s.encode() |
| 653 | + |
| 654 | + self.assertEqual(decode(b, 'utf-8', 'strict'), s) |
| 655 | + self.assertEqual(decode(b, 'utf-8', NULL), s) |
| 656 | + self.assertEqual(decode(b, 'latin1', 'strict'), b.decode('latin1')) |
| 657 | + self.assertRaises(UnicodeDecodeError, decode, b, 'ascii', 'strict') |
| 658 | + self.assertRaises(UnicodeDecodeError, decode, b, 'ascii', NULL) |
| 659 | + self.assertEqual(decode(b, 'ascii', 'replace'), 'a' + '\ufffd'*9) |
| 660 | + |
| 661 | + # _codecs.decode() only reports an unknown error handling name when |
| 662 | + # the corresponding error handling function is used; this difers |
| 663 | + # from PyUnicode_Decode() which checks that both the encoding and |
| 664 | + # the error handling name are recognized before even attempting to |
| 665 | + # call the decoder. |
| 666 | + self.assertEqual(decode(b'', 'utf-8', 'unknown-error-handler'), '') |
| 667 | + self.assertEqual(decode(b'a', 'utf-8', 'unknown-error-handler'), 'a') |
| 668 | + |
| 669 | + self.assertRaises(TypeError, decode, NULL, 'ascii', 'strict') |
| 670 | + with self.assertRaisesRegex(TypeError, BAD_ARGUMENT): |
| 671 | + decode(b, NULL, 'strict') |
| 672 | + |
| 673 | + def test_codec_encoder(self): |
| 674 | + codec_encoder = _testcapi.codec_encoder |
| 675 | + |
| 676 | + with self.use_custom_encoder(): |
| 677 | + encoder = codec_encoder(self.encoding_name) |
| 678 | + self.assertIs(encoder, self.codec_info.encode) |
| 679 | + |
| 680 | + with self.assertRaisesRegex(TypeError, BAD_ARGUMENT): |
| 681 | + codec_encoder(NULL) |
| 682 | + |
| 683 | + def test_codec_decoder(self): |
| 684 | + codec_decoder = _testcapi.codec_decoder |
| 685 | + |
| 686 | + with self.use_custom_encoder(): |
| 687 | + decoder = codec_decoder(self.encoding_name) |
| 688 | + self.assertIs(decoder, self.codec_info.decode) |
| 689 | + |
| 690 | + with self.assertRaisesRegex(TypeError, BAD_ARGUMENT): |
| 691 | + codec_decoder(NULL) |
| 692 | + |
| 693 | + def test_codec_incremental_encoder(self): |
| 694 | + codec_incremental_encoder = _testcapi.codec_incremental_encoder |
| 695 | + |
| 696 | + with self.use_custom_encoder(): |
| 697 | + encoding = self.encoding_name |
| 698 | + |
| 699 | + for errors in ['strict', NULL]: |
| 700 | + with self.subTest(errors): |
| 701 | + encoder = codec_incremental_encoder(encoding, errors) |
| 702 | + self.assertIsInstance(encoder, self.codec_info.incrementalencoder) |
| 703 | + |
| 704 | + with self.assertRaisesRegex(TypeError, BAD_ARGUMENT): |
| 705 | + codec_incremental_encoder(NULL, 'strict') |
| 706 | + |
| 707 | + def test_codec_incremental_decoder(self): |
| 708 | + codec_incremental_decoder = _testcapi.codec_incremental_decoder |
| 709 | + |
| 710 | + with self.use_custom_encoder(): |
| 711 | + encoding = self.encoding_name |
| 712 | + |
| 713 | + for errors in ['strict', NULL]: |
| 714 | + with self.subTest(errors): |
| 715 | + decoder = codec_incremental_decoder(encoding, errors) |
| 716 | + self.assertIsInstance(decoder, self.codec_info.incrementaldecoder) |
| 717 | + |
| 718 | + with self.assertRaisesRegex(TypeError, BAD_ARGUMENT): |
| 719 | + codec_incremental_decoder(NULL, 'strict') |
| 720 | + |
| 721 | + def test_codec_stream_reader(self): |
| 722 | + codec_stream_reader = _testcapi.codec_stream_reader |
| 723 | + |
| 724 | + with self.use_custom_encoder(): |
| 725 | + encoding, stream = self.encoding_name, io.StringIO() |
| 726 | + for errors in ['strict', NULL]: |
| 727 | + with self.subTest(errors): |
| 728 | + writer = codec_stream_reader(encoding, stream, errors) |
| 729 | + self.assertIsInstance(writer, self.codec_info.streamreader) |
| 730 | + |
| 731 | + with self.assertRaisesRegex(TypeError, BAD_ARGUMENT): |
| 732 | + codec_stream_reader(NULL, stream, 'strict') |
| 733 | + |
| 734 | + def test_codec_stream_writer(self): |
| 735 | + codec_stream_writer = _testcapi.codec_stream_writer |
| 736 | + |
| 737 | + with self.use_custom_encoder(): |
| 738 | + encoding, stream = self.encoding_name, io.StringIO() |
| 739 | + for errors in ['strict', NULL]: |
| 740 | + with self.subTest(errors): |
| 741 | + writer = codec_stream_writer(encoding, stream, errors) |
| 742 | + self.assertIsInstance(writer, self.codec_info.streamwriter) |
| 743 | + |
| 744 | + with self.assertRaisesRegex(TypeError, BAD_ARGUMENT): |
| 745 | + codec_stream_writer(NULL, stream, 'strict') |
| 746 | + |
| 747 | + |
| 748 | +class CAPICodecErrors(unittest.TestCase): |
| 749 | + |
| 750 | + def test_codec_register_error(self): |
| 751 | + # for cleaning up between tests |
| 752 | + from _codecs import _unregister_error as _codecs_unregister_error |
| 753 | + |
| 754 | + self.assertRaises(LookupError, _testcapi.codec_lookup_error, 'custom') |
| 755 | + |
| 756 | + def custom_error_handler(exc): |
| 757 | + raise exc |
| 758 | + |
| 759 | + error_handler = mock.Mock(wraps=custom_error_handler) |
| 760 | + _testcapi.codec_register_error('custom', error_handler) |
| 761 | + self.addCleanup(_codecs_unregister_error, 'custom') |
| 762 | + |
| 763 | + self.assertRaises(UnicodeEncodeError, codecs.encode, |
| 764 | + '\xff', 'ascii', errors='custom') |
| 765 | + error_handler.assert_called_once() |
| 766 | + error_handler.reset_mock() |
| 767 | + |
| 768 | + self.assertRaises(UnicodeDecodeError, codecs.decode, |
| 769 | + b'\xff', 'ascii', errors='custom') |
| 770 | + error_handler.assert_called_once() |
| 771 | + |
| 772 | + # _codecs._unregister_error directly delegates to the internal C |
| 773 | + # function so a Python-level function test is sufficient (it is |
| 774 | + # tested in test_codeccallbacks). |
| 775 | + |
| 776 | + def test_codec_lookup_error(self): |
| 777 | + codec_lookup_error = _testcapi.codec_lookup_error |
| 778 | + self.assertIs(codec_lookup_error(NULL), codecs.strict_errors) |
| 779 | + self.assertIs(codec_lookup_error('strict'), codecs.strict_errors) |
| 780 | + self.assertIs(codec_lookup_error('ignore'), codecs.ignore_errors) |
| 781 | + self.assertIs(codec_lookup_error('replace'), codecs.replace_errors) |
| 782 | + self.assertIs(codec_lookup_error('xmlcharrefreplace'), codecs.xmlcharrefreplace_errors) |
| 783 | + self.assertIs(codec_lookup_error('namereplace'), codecs.namereplace_errors) |
| 784 | + self.assertRaises(LookupError, codec_lookup_error, 'unknown') |
| 785 | + |
| 786 | + def test_codec_error_handlers(self): |
| 787 | + exceptions = [ |
| 788 | + # A UnicodeError with an empty message currently crashes: |
| 789 | + # See: https://github.com/python/cpython/issues/123378 |
| 790 | + # UnicodeEncodeError('bad', '', 0, 1, 'reason'), |
| 791 | + UnicodeEncodeError('bad', 'x', 0, 1, 'reason'), |
| 792 | + UnicodeEncodeError('bad', 'xyz123', 0, 1, 'reason'), |
| 793 | + UnicodeEncodeError('bad', 'xyz123', 1, 4, 'reason'), |
| 794 | + ] |
| 795 | + |
| 796 | + strict_handler = _testcapi.codec_strict_errors |
| 797 | + for exc in exceptions: |
| 798 | + with self.subTest(handler=strict_handler, exc=exc): |
| 799 | + self.assertRaises(UnicodeEncodeError, strict_handler, exc) |
| 800 | + |
| 801 | + for handler in [ |
| 802 | + _testcapi.codec_ignore_errors, |
| 803 | + _testcapi.codec_replace_errors, |
| 804 | + _testcapi.codec_xmlcharrefreplace_errors, |
| 805 | + _testlimitedcapi.codec_namereplace_errors, |
| 806 | + ]: |
| 807 | + for exc in exceptions: |
| 808 | + with self.subTest(handler=handler, exc=exc): |
| 809 | + self.assertIsInstance(handler(exc), tuple) |
| 810 | + |
| 811 | + |
519 | 812 | if __name__ == "__main__":
|
520 | 813 | unittest.main()
|
0 commit comments