From d3c075fb0acfb644d3e938b30f1023f48a11a095 Mon Sep 17 00:00:00 2001 From: Gabriel Levcovitz Date: Sun, 5 Nov 2023 13:44:27 -0300 Subject: [PATCH] refactor(scripts): implement ScriptContext --- hathor/transaction/scripts/execute.py | 14 +- hathor/transaction/scripts/opcode.py | 220 ++++++++++--------- hathor/transaction/scripts/script_context.py | 25 +++ tests/tx/test_scripts.py | 160 +++++++------- 4 files changed, 223 insertions(+), 196 deletions(-) create mode 100644 hathor/transaction/scripts/script_context.py diff --git a/hathor/transaction/scripts/execute.py b/hathor/transaction/scripts/execute.py index 18af61c10..23109afbc 100644 --- a/hathor/transaction/scripts/execute.py +++ b/hathor/transaction/scripts/execute.py @@ -16,7 +16,7 @@ from typing import NamedTuple, Optional, Union from hathor.transaction import BaseTransaction, Transaction, TxInput -from hathor.transaction.exceptions import DataIndexError, FinalStackInvalid, InvalidScriptError, OutOfData, ScriptError +from hathor.transaction.exceptions import DataIndexError, FinalStackInvalid, InvalidScriptError, OutOfData class ScriptExtras(NamedTuple): @@ -54,21 +54,19 @@ def execute_eval(data: bytes, log: list[str], extras: ScriptExtras) -> None: :raises ScriptError: case opcode is not found :raises FinalStackInvalid: case the evaluation fails """ - from hathor.transaction.scripts.opcode import MAP_OPCODE_TO_FN, Opcode + from hathor.transaction.scripts.opcode import Opcode, execute_op_code + from hathor.transaction.scripts.script_context import ScriptContext stack: Stack = [] + context = ScriptContext(stack=stack, logs=log, extras=extras) data_len = len(data) pos = 0 while pos < data_len: opcode, pos = get_script_op(pos, data, stack) if Opcode.is_pushdata(opcode): continue - # this is an opcode manipulating the stack - fn = MAP_OPCODE_TO_FN.get(opcode, None) - if fn is None: - # throw error - raise ScriptError('unknown opcode') - fn(stack, log, extras) + # this is an opcode manipulating the stack + execute_op_code(Opcode(opcode), context) evaluate_final_stack(stack, log) diff --git a/hathor/transaction/scripts/opcode.py b/hathor/transaction/scripts/opcode.py index 5af5d09e9..3c185f5a5 100644 --- a/hathor/transaction/scripts/opcode.py +++ b/hathor/transaction/scripts/opcode.py @@ -15,7 +15,6 @@ import datetime import struct from enum import IntEnum -from typing import Callable from cryptography.exceptions import InvalidSignature from cryptography.hazmat.primitives import hashes @@ -38,14 +37,8 @@ TimeLocked, VerifyFailed, ) -from hathor.transaction.scripts.execute import ( - ScriptExtras, - Stack, - binary_to_int, - decode_opn, - get_data_value, - get_script_op, -) +from hathor.transaction.scripts.execute import Stack, binary_to_int, decode_opn, get_data_value, get_script_op +from hathor.transaction.scripts.script_context import ScriptContext class Opcode(IntEnum): @@ -157,7 +150,7 @@ def op_pushdata1(position: int, full_data: bytes, stack: Stack) -> int: return new_pos -def op_dup(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_dup(context: ScriptContext) -> None: """Duplicates item on top of stack :param stack: the stack used when evaluating the script @@ -165,12 +158,12 @@ def op_dup(stack: Stack, log: list[str], extras: ScriptExtras) -> None: :raises MissingStackItems: if there's no element on stack """ - if not len(stack): + if not len(context.stack): raise MissingStackItems('OP_DUP: empty stack') - stack.append(stack[-1]) + context.stack.append(context.stack[-1]) -def op_greaterthan_timestamp(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_greaterthan_timestamp(context: ScriptContext) -> None: """Check whether transaction's timestamp is greater than the top of stack The top of stack must be a big-endian u32int. @@ -180,17 +173,17 @@ def op_greaterthan_timestamp(stack: Stack, log: list[str], extras: ScriptExtras) :raises MissingStackItems: if there's no element on stack """ - if not len(stack): + if not len(context.stack): raise MissingStackItems('OP_GREATERTHAN_TIMESTAMP: empty stack') - buf = stack.pop() + buf = context.stack.pop() assert isinstance(buf, bytes) (timelock,) = struct.unpack('!I', buf) - if extras.tx.timestamp <= timelock: + if context.extras.tx.timestamp <= timelock: raise TimeLocked('The output is locked until {}'.format( datetime.datetime.fromtimestamp(timelock).strftime("%m/%d/%Y %I:%M:%S %p"))) -def op_equalverify(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_equalverify(context: ScriptContext) -> None: """Verifies top 2 elements from stack are equal :param stack: the stack used when evaluating the script @@ -199,15 +192,15 @@ def op_equalverify(stack: Stack, log: list[str], extras: ScriptExtras) -> None: :raises MissingStackItems: if there aren't 2 element on stack :raises EqualVerifyFailed: items don't match """ - if len(stack) < 2: - raise MissingStackItems('OP_EQUALVERIFY: need 2 elements on stack, currently {}'.format(len(stack))) - op_equal(stack, log, extras) - is_equal = stack.pop() + if len(context.stack) < 2: + raise MissingStackItems('OP_EQUALVERIFY: need 2 elements on stack, currently {}'.format(len(context.stack))) + op_equal(context) + is_equal = context.stack.pop() if not is_equal: raise EqualVerifyFailed('Failed to verify if elements are equal') -def op_equal(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_equal(context: ScriptContext) -> None: """Verifies top 2 elements from stack are equal In case they are the same, we push 1 to the stack and push 0 if they are different @@ -215,20 +208,20 @@ def op_equal(stack: Stack, log: list[str], extras: ScriptExtras) -> None: :param stack: the stack used when evaluating the script :type stack: list[] """ - if len(stack) < 2: - raise MissingStackItems('OP_EQUAL: need 2 elements on stack, currently {}'.format(len(stack))) - elem1 = stack.pop() - elem2 = stack.pop() + if len(context.stack) < 2: + raise MissingStackItems('OP_EQUAL: need 2 elements on stack, currently {}'.format(len(context.stack))) + elem1 = context.stack.pop() + elem2 = context.stack.pop() assert isinstance(elem1, bytes) assert isinstance(elem2, bytes) if elem1 == elem2: - stack.append(1) + context.stack.append(1) else: - stack.append(0) - log.append('OP_EQUAL: failed. elements: {} {}'.format(elem1.hex(), elem2.hex())) + context.stack.append(0) + context.logs.append('OP_EQUAL: failed. elements: {} {}'.format(elem1.hex(), elem2.hex())) -def op_checksig(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_checksig(context: ScriptContext) -> None: """Verifies public key and signature match. Expects public key to be on top of stack, followed by signature. If they match, put 1 on stack (meaning True); otherwise, push 0 (False) @@ -241,10 +234,10 @@ def op_checksig(stack: Stack, log: list[str], extras: ScriptExtras) -> None: :return: if they don't match, return error message :rtype: string """ - if len(stack) < 2: - raise MissingStackItems('OP_CHECKSIG: need 2 elements on stack, currently {}'.format(len(stack))) - pubkey = stack.pop() - signature = stack.pop() + if len(context.stack) < 2: + raise MissingStackItems('OP_CHECKSIG: need 2 elements on stack, currently {}'.format(len(context.stack))) + pubkey = context.stack.pop() + signature = context.stack.pop() assert isinstance(pubkey, bytes) assert isinstance(signature, bytes) @@ -256,16 +249,16 @@ def op_checksig(stack: Stack, log: list[str], extras: ScriptExtras) -> None: # pubkey is not compressed public key raise ScriptError('OP_CHECKSIG: pubkey is not a public key') from e try: - public_key.verify(signature, extras.tx.get_sighash_all_data(), ec.ECDSA(hashes.SHA256())) + public_key.verify(signature, context.extras.tx.get_sighash_all_data(), ec.ECDSA(hashes.SHA256())) # valid, push true to stack - stack.append(1) + context.stack.append(1) except InvalidSignature: # invalid, push false to stack - stack.append(0) - log.append('OP_CHECKSIG: failed') + context.stack.append(0) + context.logs.append('OP_CHECKSIG: failed') -def op_hash160(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_hash160(context: ScriptContext) -> None: """Top stack item is hashed twice: first with SHA-256 and then with RIPEMD-160. Result is pushed back to stack. @@ -274,15 +267,15 @@ def op_hash160(stack: Stack, log: list[str], extras: ScriptExtras) -> None: :raises MissingStackItems: if there's no element on stack """ - if not len(stack): + if not len(context.stack): raise MissingStackItems('OP_HASH160: empty stack') - elem1 = stack.pop() + elem1 = context.stack.pop() assert isinstance(elem1, bytes) new_elem = get_hash160(elem1) - stack.append(new_elem) + context.stack.append(new_elem) -def op_checkdatasig(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_checkdatasig(context: ScriptContext) -> None: """Verifies public key, signature and data match. Expects public key to be on top of stack, followed by signature and data. If they match, put data on stack; otherwise, fail. @@ -292,11 +285,11 @@ def op_checkdatasig(stack: Stack, log: list[str], extras: ScriptExtras) -> None: :raises MissingStackItems: if there aren't 3 element on stack :raises OracleChecksigFailed: invalid signature, given data and public key """ - if len(stack) < 3: - raise MissingStackItems('OP_CHECKDATASIG: need 3 elements on stack, currently {}'.format(len(stack))) - pubkey = stack.pop() - signature = stack.pop() - data = stack.pop() + if len(context.stack) < 3: + raise MissingStackItems('OP_CHECKDATASIG: need 3 elements on stack, currently {}'.format(len(context.stack))) + pubkey = context.stack.pop() + signature = context.stack.pop() + data = context.stack.pop() assert isinstance(pubkey, bytes) assert isinstance(signature, bytes) assert isinstance(data, bytes) @@ -311,12 +304,12 @@ def op_checkdatasig(stack: Stack, log: list[str], extras: ScriptExtras) -> None: try: public_key.verify(signature, data, ec.ECDSA(hashes.SHA256())) # valid, push true to stack - stack.append(data) + context.stack.append(data) except InvalidSignature as e: raise OracleChecksigFailed from e -def op_data_strequal(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_data_strequal(context: ScriptContext) -> None: """Equivalent to an OP_GET_DATA_STR followed by an OP_EQUALVERIFY. Consumes three parameters from stack: . Gets the kth value @@ -329,11 +322,11 @@ def op_data_strequal(stack: Stack, log: list[str], extras: ScriptExtras) -> None :raises MissingStackItems: if there aren't 3 element on stack :raises VerifyFailed: verification failed """ - if len(stack) < 3: - raise MissingStackItems('OP_DATA_STREQUAL: need 3 elements on stack, currently {}'.format(len(stack))) - value = stack.pop() - data_k = stack.pop() - data = stack.pop() + if len(context.stack) < 3: + raise MissingStackItems('OP_DATA_STREQUAL: need 3 elements on stack, currently {}'.format(len(context.stack))) + value = context.stack.pop() + data_k = context.stack.pop() + data = context.stack.pop() assert isinstance(value, bytes) assert isinstance(data, bytes) @@ -344,10 +337,10 @@ def op_data_strequal(stack: Stack, log: list[str], extras: ScriptExtras) -> None if data_value != value: raise VerifyFailed('OP_DATA_STREQUAL: {} x {}'.format(data_value.decode('utf-8'), value.decode('utf-8'))) - stack.append(data) + context.stack.append(data) -def op_data_greaterthan(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_data_greaterthan(context: ScriptContext) -> None: """Equivalent to an OP_GET_DATA_INT followed by an OP_GREATERTHAN. Consumes three parameters from stack: . Gets the kth value @@ -359,11 +352,11 @@ def op_data_greaterthan(stack: Stack, log: list[str], extras: ScriptExtras) -> N :raises MissingStackItems: if there aren't 3 element on stack :raises VerifyFailed: verification failed """ - if len(stack) < 3: - raise MissingStackItems('OP_DATA_GREATERTHAN: need 3 elements on stack, currently {}'.format(len(stack))) - value = stack.pop() - data_k = stack.pop() - data = stack.pop() + if len(context.stack) < 3: + raise MissingStackItems(f'OP_DATA_GREATERTHAN: need 3 elements on stack, currently {len(context.stack)}') + value = context.stack.pop() + data_k = context.stack.pop() + data = context.stack.pop() assert isinstance(value, bytes) assert isinstance(data, bytes) @@ -380,10 +373,10 @@ def op_data_greaterthan(stack: Stack, log: list[str], extras: ScriptExtras) -> N if data_int <= value_int: raise VerifyFailed('op_data_greaterthan: {} x {}'.format(data_int, value_int)) - stack.append(data) + context.stack.append(data) -def op_data_match_interval(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_data_match_interval(stack: Stack) -> None: """Equivalent to an OP_GET_DATA_INT followed by an OP_MATCH_INTERVAL. :param stack: the stack used when evaluating the script @@ -435,7 +428,7 @@ def op_data_match_interval(stack: Stack, log: list[str], extras: ScriptExtras) - stack.append(last_pubkey) -def op_data_match_value(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_data_match_value(context: ScriptContext) -> None: """Equivalent to an OP_GET_DATA_STR followed by an OP_MATCH_VALUE. :param stack: the stack used when evaluating the script @@ -444,25 +437,25 @@ def op_data_match_value(stack: Stack, log: list[str], extras: ScriptExtras) -> N :raises MissingStackItems: if there aren't 3 element on stack :raises VerifyFailed: verification failed """ - if len(stack) < 1: + if len(context.stack) < 1: raise MissingStackItems('OP_DATA_MATCH_VALUE: empty stack') - data_n_items = stack.pop() + data_n_items = context.stack.pop() assert isinstance(data_n_items, bytes) # TODO test this can be transformed to integer n_items = data_n_items[0] # number of items in stack that will be used will_use = 2 * n_items + 3 # n data_points, n + 1 keys, k and data - if len(stack) < will_use: + if len(context.stack) < will_use: raise MissingStackItems('OP_DATA_MATCH_VALUE: need {} elements on stack, currently {}'.format( - will_use, len(stack))) + will_use, len(context.stack))) items = {} try: for _ in range(n_items): - pubkey = stack.pop() - buf = stack.pop() + pubkey = context.stack.pop() + buf = context.stack.pop() assert isinstance(pubkey, (str, bytes)) assert isinstance(buf, bytes) value = binary_to_int(buf) @@ -471,20 +464,20 @@ def op_data_match_value(stack: Stack, log: list[str], extras: ScriptExtras) -> N raise VerifyFailed from e # one pubkey is left on stack - last_pubkey = stack.pop() + last_pubkey = context.stack.pop() # next two items are data index and data - data_k = stack.pop() - data = stack.pop() + data_k = context.stack.pop() + data = context.stack.pop() assert isinstance(data_k, int) assert isinstance(data, bytes) data_value = get_data_value(data_k, data) data_int = binary_to_int(data_value) winner_pubkey = items.get(data_int, last_pubkey) assert isinstance(winner_pubkey, (str, bytes)) - stack.append(winner_pubkey) + context.stack.append(winner_pubkey) -def op_find_p2pkh(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_find_p2pkh(context: ScriptContext) -> None: """Checks whether the current transaction has an output with a P2PKH script with the given public key hash and the same amount as the input. @@ -500,28 +493,28 @@ def op_find_p2pkh(stack: Stack, log: list[str], extras: ScriptExtras) -> None: :raises MissingStackItems: if stack is empty :raises VerifyFailed: verification failed """ - if not len(stack): + if not len(context.stack): raise MissingStackItems('OP_FIND_P2PKH: empty stack') from hathor.transaction.scripts import P2PKH - spent_tx = extras.spent_tx - txin = extras.txin - tx = extras.tx + spent_tx = context.extras.spent_tx + txin = context.extras.txin + tx = context.extras.tx contract_value = spent_tx.outputs[txin.index].value - address = stack.pop() + address = context.stack.pop() address_b58 = get_address_b58_from_bytes(address) for output in tx.outputs: p2pkh_out = P2PKH.parse_script(output.script) if p2pkh_out: if p2pkh_out.address == address_b58 and output.value == contract_value: - stack.append(1) + context.stack.append(1) return # didn't find any match raise VerifyFailed -def op_checkmultisig(stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_checkmultisig(context: ScriptContext) -> None: """Checks if it has the minimum signatures required and if all of them are valid :param stack: the stack used when evaluating the script @@ -532,11 +525,11 @@ def op_checkmultisig(stack: Stack, log: list[str], extras: ScriptExtras) -> None """ settings = get_settings() - if not len(stack): + if not len(context.stack): raise MissingStackItems('OP_CHECKMULTISIG: empty stack') # Pop the quantity of pubkeys - pubkey_count = stack.pop() + pubkey_count = context.stack.pop() if not isinstance(pubkey_count, int): raise InvalidStackData('OP_CHECKMULTISIG: pubkey count should be an integer') @@ -548,20 +541,20 @@ def op_checkmultisig(stack: Stack, log: list[str], extras: ScriptExtras) -> None ) ) - if len(stack) < pubkey_count: + if len(context.stack) < pubkey_count: raise MissingStackItems('OP_CHECKMULTISIG: not enough public keys on the stack') # Get all pubkeys pubkeys = [] for _ in range(pubkey_count): - pubkey_bytes = stack.pop() + pubkey_bytes = context.stack.pop() pubkeys.append(pubkey_bytes) - if not len(stack): + if not len(context.stack): raise MissingStackItems('OP_CHECKMULTISIG: less elements than should on the stack') # Pop the quantity of signatures required - signatures_count = stack.pop() + signatures_count = context.stack.pop() if not isinstance(signatures_count, int): raise InvalidStackData('OP_CHECKMULTISIG: signatures count should be an integer') @@ -574,13 +567,13 @@ def op_checkmultisig(stack: Stack, log: list[str], extras: ScriptExtras) -> None ) # Error if we don't have the minimum quantity of signatures - if len(stack) < signatures_count: + if len(context.stack) < signatures_count: raise MissingStackItems('OP_CHECKMULTISIG: not enough signatures on the stack') # Get all signatures signatures = [] for _ in range(signatures_count): - signature_bytes = stack.pop() + signature_bytes = context.stack.pop() signatures.append(signature_bytes) # For each signature we check if it's valid with one of the public keys @@ -590,21 +583,21 @@ def op_checkmultisig(stack: Stack, log: list[str], extras: ScriptExtras) -> None while pubkey_index < len(pubkeys): pubkey = pubkeys[pubkey_index] new_stack = [signature, pubkey] - op_checksig(new_stack, log, extras) + op_checksig(ScriptContext(stack=new_stack, logs=context.logs, extras=context.extras)) result = new_stack.pop() pubkey_index += 1 if result == 1: break else: # finished all pubkeys and did not verify all signatures - stack.append(0) + context.stack.append(0) return # If all signatures are valids we push 1 - stack.append(1) + context.stack.append(1) -def op_integer(opcode: int, stack: Stack, log: list[str], extras: ScriptExtras) -> None: +def op_integer(opcode: int, stack: Stack) -> None: """ Appends an integer to the stack We get the opcode comparing to all integers opcodes @@ -624,17 +617,26 @@ def op_integer(opcode: int, stack: Stack, log: list[str], extras: ScriptExtras) raise ScriptError(e) from e -MAP_OPCODE_TO_FN: dict[int, Callable[[Stack, list[str], ScriptExtras], None]] = { - Opcode.OP_DUP: op_dup, - Opcode.OP_EQUAL: op_equal, - Opcode.OP_EQUALVERIFY: op_equalverify, - Opcode.OP_CHECKSIG: op_checksig, - Opcode.OP_HASH160: op_hash160, - Opcode.OP_GREATERTHAN_TIMESTAMP: op_greaterthan_timestamp, - Opcode.OP_CHECKMULTISIG: op_checkmultisig, - Opcode.OP_DATA_STREQUAL: op_data_strequal, - Opcode.OP_DATA_GREATERTHAN: op_data_greaterthan, - Opcode.OP_DATA_MATCH_VALUE: op_data_match_value, - Opcode.OP_CHECKDATASIG: op_checkdatasig, - Opcode.OP_FIND_P2PKH: op_find_p2pkh, -} +def execute_op_code(opcode: Opcode, context: ScriptContext) -> None: + """ + Execute a function opcode. + + Args: + opcode: the opcode to be executed. + context: the script context to be manipulated. + """ + context.logs.append(f'Executing function opcode {opcode.name} ({hex(opcode.value)})') + match opcode: + case Opcode.OP_DUP: op_dup(context) + case Opcode.OP_EQUAL: op_equal(context) + case Opcode.OP_EQUALVERIFY: op_equalverify(context) + case Opcode.OP_CHECKSIG: op_checksig(context) + case Opcode.OP_HASH160: op_hash160(context) + case Opcode.OP_GREATERTHAN_TIMESTAMP: op_greaterthan_timestamp(context) + case Opcode.OP_CHECKMULTISIG: op_checkmultisig(context) + case Opcode.OP_DATA_STREQUAL: op_data_strequal(context) + case Opcode.OP_DATA_GREATERTHAN: op_data_greaterthan(context) + case Opcode.OP_DATA_MATCH_VALUE: op_data_match_value(context) + case Opcode.OP_CHECKDATASIG: op_checkdatasig(context) + case Opcode.OP_FIND_P2PKH: op_find_p2pkh(context) + case _: raise ScriptError(f'unknown opcode: {opcode}') diff --git a/hathor/transaction/scripts/script_context.py b/hathor/transaction/scripts/script_context.py new file mode 100644 index 000000000..925a881f1 --- /dev/null +++ b/hathor/transaction/scripts/script_context.py @@ -0,0 +1,25 @@ +# Copyright 2023 Hathor Labs +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from hathor.transaction.scripts.execute import ScriptExtras, Stack + + +class ScriptContext: + """A context to be manipulated during script execution. A separate instance must be used for each script.""" + __slots__ = ('stack', 'logs', 'extras') + + def __init__(self, *, stack: Stack, logs: list[str], extras: ScriptExtras) -> None: + self.stack = stack + self.logs = logs + self.extras = extras diff --git a/tests/tx/test_scripts.py b/tests/tx/test_scripts.py index 853be2ac4..b6cf99566 100644 --- a/tests/tx/test_scripts.py +++ b/tests/tx/test_scripts.py @@ -1,4 +1,5 @@ import struct +from unittest.mock import Mock from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import ec @@ -54,6 +55,7 @@ op_pushdata, op_pushdata1, ) +from hathor.transaction.scripts.script_context import ScriptContext from hathor.transaction.storage import TransactionMemoryStorage from hathor.wallet import HDWallet from tests import unittest @@ -174,22 +176,22 @@ def test_pushdata1(self): def test_dup(self): with self.assertRaises(MissingStackItems): - op_dup([], log=[], extras=None) + op_dup(ScriptContext(stack=[], logs=[], extras=Mock())) stack = [1] - op_dup(stack, log=[], extras=None) + op_dup(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack[-1], stack[-2]) def test_equalverify(self): elem = b'a' with self.assertRaises(MissingStackItems): - op_equalverify([elem], log=[], extras=None) + op_equalverify(ScriptContext(stack=[elem], logs=[], extras=Mock())) # no exception should be raised - op_equalverify([elem, elem], log=[], extras=None) + op_equalverify(ScriptContext(stack=[elem, elem], logs=[], extras=Mock())) with self.assertRaises(EqualVerifyFailed): - op_equalverify([elem, b'aaaa'], log=[], extras=None) + op_equalverify(ScriptContext(stack=[elem, b'aaaa'], logs=[], extras=Mock())) def test_checksig_raise_on_uncompressed_pubkey(self): """ Uncompressed pubkeys shoud not be accepted, even if they solve the signature @@ -211,11 +213,11 @@ def test_checksig_raise_on_uncompressed_pubkey(self): # ScriptError if pubkey is not a valid compressed public key # with wrong signature with self.assertRaises(ScriptError): - op_checksig([b'123', pubkey_uncompressed], log=[], extras=None) + op_checksig(ScriptContext(stack=[b'123', pubkey_uncompressed], logs=[], extras=Mock())) # or with rigth one # this will make sure the signature is not made when parameters are wrong with self.assertRaises(ScriptError): - op_checksig([signature, pubkey_uncompressed], log=[], extras=None) + op_checksig(ScriptContext(stack=[signature, pubkey_uncompressed], logs=[], extras=Mock())) def test_checksig_check_for_compressed_pubkey(self): """ Compressed pubkeys bytes representation always start with a byte 2 or 3 @@ -224,19 +226,19 @@ def test_checksig_check_for_compressed_pubkey(self): """ # ScriptError if pubkey is not a public key but starts with 2 or 3 with self.assertRaises(ScriptError): - op_checksig([b'\x0233', b'\x0233'], log=[], extras=None) + op_checksig(ScriptContext(stack=[b'\x0233', b'\x0233'], logs=[], extras=Mock())) with self.assertRaises(ScriptError): - op_checksig([b'\x0321', b'\x0321'], log=[], extras=None) + op_checksig(ScriptContext(stack=[b'\x0321', b'\x0321'], logs=[], extras=Mock())) # ScriptError if pubkey does not start with 2 or 3 with self.assertRaises(ScriptError): - op_checksig([b'\x0123', b'\x0123'], log=[], extras=None) + op_checksig(ScriptContext(stack=[b'\x0123', b'\x0123'], logs=[], extras=Mock())) with self.assertRaises(ScriptError): - op_checksig([b'\x0423', b'\x0423'], log=[], extras=None) + op_checksig(ScriptContext(stack=[b'\x0423', b'\x0423'], logs=[], extras=Mock())) def test_checksig(self): with self.assertRaises(MissingStackItems): - op_checksig([1], log=[], extras=None) + op_checksig(ScriptContext(stack=[1], logs=[], extras=Mock())) block = self.genesis_blocks[0] @@ -251,15 +253,15 @@ def test_checksig(self): signature = self.genesis_private_key.sign(hashed_data, ec.ECDSA(hashes.SHA256())) pubkey_bytes = get_public_key_bytes_compressed(self.genesis_public_key) - extras = ScriptExtras(tx=tx, txin=None, spent_tx=None) + extras = ScriptExtras(tx=tx, txin=Mock(), spent_tx=Mock()) # wrong signature puts False (0) on stack stack = [b'aaaaaaaaa', pubkey_bytes] - op_checksig(stack, log=[], extras=extras) + op_checksig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(0, stack.pop()) stack = [signature, pubkey_bytes] - op_checksig(stack, log=[], extras=extras) + op_checksig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(1, stack.pop()) def test_checksig_cache(self): @@ -276,22 +278,22 @@ def test_checksig_cache(self): signature = self.genesis_private_key.sign(hashed_data, ec.ECDSA(hashes.SHA256())) pubkey_bytes = get_public_key_bytes_compressed(self.genesis_public_key) - extras = ScriptExtras(tx=tx, txin=None, spent_tx=None) + extras = ScriptExtras(tx=tx, txin=Mock(), spent_tx=Mock()) stack = [signature, pubkey_bytes] self.assertIsNone(tx._sighash_data_cache) - op_checksig(stack, log=[], extras=extras) + op_checksig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertIsNotNone(tx._sighash_data_cache) self.assertEqual(1, stack.pop()) def test_hash160(self): with self.assertRaises(MissingStackItems): - op_hash160([], log=[], extras=None) + op_hash160(ScriptContext(stack=[], logs=[], extras=Mock())) elem = b'aaaaaaaa' hash160 = get_hash160(elem) stack = [elem] - op_hash160(stack, log=[], extras=None) + op_hash160(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(hash160, stack.pop()) def test_checkdatasig_raise_on_uncompressed_pubkey(self): @@ -314,27 +316,27 @@ def test_checkdatasig_raise_on_uncompressed_pubkey(self): # with wrong signature stack = [data, b'123', pubkey_uncompressed] with self.assertRaises(ScriptError): - op_checkdatasig(stack, log=[], extras=None) + op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock())) # or with rigth one # this will make sure the signature is not made when parameters are wrong stack = [data, signature, pubkey_uncompressed] with self.assertRaises(ScriptError): - op_checkdatasig(stack, log=[], extras=None) + op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock())) def test_checkdatasig_check_for_compressed_pubkey(self): # ScriptError if pubkey is not a public key but starts with 2 or 3 with self.assertRaises(ScriptError): - op_checkdatasig([b'\x0233', b'\x0233', b'\x0233'], log=[], extras=None) + op_checkdatasig(ScriptContext(stack=[b'\x0233', b'\x0233', b'\x0233'], logs=[], extras=Mock())) with self.assertRaises(ScriptError): - op_checkdatasig([b'\x0321', b'\x0321', b'\x0321'], log=[], extras=None) + op_checkdatasig(ScriptContext(stack=[b'\x0321', b'\x0321', b'\x0321'], logs=[], extras=Mock())) # ScriptError if pubkey is not a public key with self.assertRaises(ScriptError): - op_checkdatasig([b'\x0123', b'\x0123', b'\x0123'], log=[], extras=None) + op_checkdatasig(ScriptContext(stack=[b'\x0123', b'\x0123', b'\x0123'], logs=[], extras=Mock())) def test_checkdatasig(self): with self.assertRaises(MissingStackItems): - op_checkdatasig([1, 1], log=[], extras=None) + op_checkdatasig(ScriptContext(stack=[1, 1], logs=[], extras=Mock())) data = b'some_random_data' signature = self.genesis_private_key.sign(data, ec.ECDSA(hashes.SHA256())) @@ -342,12 +344,12 @@ def test_checkdatasig(self): stack = [data, signature, pubkey_bytes] # no exception should be raised and data is left on stack - op_checkdatasig(stack, log=[], extras=None) + op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(data, stack.pop()) stack = [b'data_not_matching', signature, pubkey_bytes] with self.assertRaises(OracleChecksigFailed): - op_checkdatasig(stack, log=[], extras=None) + op_checkdatasig(ScriptContext(stack=stack, logs=[], extras=Mock())) def test_get_data_value(self): value0 = b'value0' @@ -368,7 +370,7 @@ def test_get_data_value(self): def test_data_strequal(self): with self.assertRaises(MissingStackItems): - op_data_strequal([1, 1], log=[], extras=None) + op_data_strequal(ScriptContext(stack=[1, 1], logs=[], extras=Mock())) value0 = b'value0' value1 = b'vvvalue1' @@ -377,20 +379,20 @@ def test_data_strequal(self): data = (bytes([len(value0)]) + value0 + bytes([len(value1)]) + value1 + bytes([len(value2)]) + value2) stack = [data, 0, value0] - op_data_strequal(stack, log=[], extras=None) + op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack.pop(), data) stack = [data, 1, value0] with self.assertRaises(VerifyFailed): - op_data_strequal(stack, log=[], extras=None) + op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock())) stack = [data, b'\x00', value0] with self.assertRaises(VerifyFailed): - op_data_strequal(stack, log=[], extras=None) + op_data_strequal(ScriptContext(stack=stack, logs=[], extras=Mock())) def test_data_greaterthan(self): with self.assertRaises(MissingStackItems): - op_data_greaterthan([1, 1], log=[], extras=None) + op_data_greaterthan(ScriptContext(stack=[1, 1], logs=[], extras=Mock())) value0 = struct.pack('!I', 1000) value1 = struct.pack('!I', 1) @@ -398,93 +400,93 @@ def test_data_greaterthan(self): data = (bytes([len(value0)]) + value0 + bytes([len(value1)]) + value1) stack = [data, 0, struct.pack('!I', 999)] - op_data_greaterthan(stack, log=[], extras=None) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack.pop(), data) stack = [data, 1, struct.pack('!I', 0)] - op_data_greaterthan(stack, log=[], extras=None) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack.pop(), data) with self.assertRaises(VerifyFailed): stack = [data, 1, struct.pack('!I', 1)] - op_data_greaterthan(stack, log=[], extras=None) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) stack = [data, 1, b'not_an_int'] with self.assertRaises(VerifyFailed): - op_data_greaterthan(stack, log=[], extras=None) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) stack = [data, b'\x00', struct.pack('!I', 0)] with self.assertRaises(VerifyFailed): - op_data_greaterthan(stack, log=[], extras=None) + op_data_greaterthan(ScriptContext(stack=stack, logs=[], extras=Mock())) def test_data_match_interval(self): with self.assertRaises(MissingStackItems): - op_data_match_interval([1, b'2'], log=[], extras=None) + op_data_match_interval([1, b'2']) value0 = struct.pack('!I', 1000) data = (bytes([len(value0)]) + value0) stack = [data, 0, 'key1', struct.pack('!I', 1000), 'key2', struct.pack('!I', 1005), 'key3', bytes([2])] - op_data_match_interval(stack, log=[], extras=None) + op_data_match_interval(stack) self.assertEqual(stack.pop(), 'key1') self.assertEqual(len(stack), 0) stack = [data, 0, 'key1', struct.pack('!I', 100), 'key2', struct.pack('!I', 1005), 'key3', bytes([2])] - op_data_match_interval(stack, log=[], extras=None) + op_data_match_interval(stack) self.assertEqual(stack.pop(), 'key2') self.assertEqual(len(stack), 0) stack = [data, 0, 'key1', struct.pack('!I', 100), 'key2', struct.pack('!I', 900), 'key3', bytes([2])] - op_data_match_interval(stack, log=[], extras=None) + op_data_match_interval(stack) self.assertEqual(stack.pop(), 'key3') self.assertEqual(len(stack), 0) # missing 1 item on stack stack = [data, 0, struct.pack('!I', 100), 'key2', struct.pack('!I', 900), 'key3', bytes([2])] with self.assertRaises(MissingStackItems): - op_data_match_interval(stack, log=[], extras=None) + op_data_match_interval(stack) # value should be an integer stack = [data, 0, 'key1', struct.pack('!I', 100), 'key2', b'not_an_int', 'key3', bytes([2])] with self.assertRaises(VerifyFailed): - op_data_match_interval(stack, log=[], extras=None) + op_data_match_interval(stack) def test_data_match_value(self): with self.assertRaises(MissingStackItems): - op_data_match_value([1, b'2'], log=[], extras=None) + op_data_match_value(ScriptContext(stack=[1, b'2'], logs=[], extras=Mock())) value0 = struct.pack('!I', 1000) data = (bytes([len(value0)]) + value0) stack = [data, 0, 'key1', struct.pack('!I', 1000), 'key2', struct.pack('!I', 1005), 'key3', bytes([2])] - op_data_match_value(stack, log=[], extras=None) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack.pop(), 'key2') self.assertEqual(len(stack), 0) stack = [data, 0, 'key1', struct.pack('!I', 999), 'key2', struct.pack('!I', 1000), 'key3', bytes([2])] - op_data_match_value(stack, log=[], extras=None) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack.pop(), 'key3') self.assertEqual(len(stack), 0) # missing 1 item on stack stack = [data, 0, 'key1', struct.pack('!I', 1000), 'key2', struct.pack('!I', 1000), bytes([2])] with self.assertRaises(MissingStackItems): - op_data_match_value(stack, log=[], extras=None) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) # no value matches stack = [data, 0, 'key1', struct.pack('!I', 999), 'key2', struct.pack('!I', 1111), 'key3', bytes([2])] - op_data_match_value(stack, log=[], extras=None) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack.pop(), 'key1') self.assertEqual(len(stack), 0) # value should be an integer stack = [data, 0, 'key1', struct.pack('!I', 100), 'key2', b'not_an_int', 'key3', bytes([2])] with self.assertRaises(VerifyFailed): - op_data_match_value(stack, log=[], extras=None) + op_data_match_value(ScriptContext(stack=stack, logs=[], extras=Mock())) def test_find_p2pkh(self): with self.assertRaises(MissingStackItems): - op_find_p2pkh([], log=[], extras=None) + op_find_p2pkh(ScriptContext(stack=[], logs=[], extras=Mock())) addr1 = '15d14K5jMqsN2uwUEFqiPG5SoD7Vr1BfnH' addr2 = '1K35zJQeYrVzQAW7X3s7vbPKmngj5JXTBc' @@ -507,14 +509,14 @@ def test_find_p2pkh(self): stack = [genesis_address] tx = Transaction(outputs=[TxOutput(1, out_genesis)]) extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) - op_find_p2pkh(stack, log=[], extras=extras) + op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(stack.pop(), 1) # several outputs and correct output among them stack = [genesis_address] tx = Transaction(outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(1, out_genesis), TxOutput(1, out3)]) extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) - op_find_p2pkh(stack, log=[], extras=extras) + op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(stack.pop(), 1) # several outputs without correct amount output @@ -522,18 +524,18 @@ def test_find_p2pkh(self): tx = Transaction(outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(2, out_genesis), TxOutput(1, out3)]) extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) with self.assertRaises(VerifyFailed): - op_find_p2pkh(stack, log=[], extras=extras) + op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras)) # several outputs without correct address output stack = [genesis_address] tx = Transaction(outputs=[TxOutput(1, out1), TxOutput(1, out2), TxOutput(1, out3)]) extras = ScriptExtras(tx=tx, txin=txin, spent_tx=spent_tx) with self.assertRaises(VerifyFailed): - op_find_p2pkh(stack, log=[], extras=extras) + op_find_p2pkh(ScriptContext(stack=stack, logs=[], extras=extras)) def test_greaterthan_timestamp(self): with self.assertRaises(MissingStackItems): - op_greaterthan_timestamp([], log=[], extras=None) + op_greaterthan_timestamp(ScriptContext(stack=[], logs=[], extras=Mock())) timestamp = 1234567 @@ -541,23 +543,23 @@ def test_greaterthan_timestamp(self): tx = Transaction() stack = [struct.pack('!I', timestamp)] - extras = ScriptExtras(tx=tx, txin=None, spent_tx=None) + extras = ScriptExtras(tx=tx, txin=Mock(), spent_tx=Mock()) with self.assertRaises(TimeLocked): tx.timestamp = timestamp - 1 - op_greaterthan_timestamp(list(stack), log=[], extras=extras) + op_greaterthan_timestamp(ScriptContext(stack=list(stack), logs=[], extras=extras)) with self.assertRaises(TimeLocked): tx.timestamp = timestamp - op_greaterthan_timestamp(list(stack), log=[], extras=extras) + op_greaterthan_timestamp(ScriptContext(stack=list(stack), logs=[], extras=extras)) tx.timestamp = timestamp + 1 - op_greaterthan_timestamp(stack, log=[], extras=extras) + op_greaterthan_timestamp(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(len(stack), 0) def test_checkmultisig(self): with self.assertRaises(MissingStackItems): - op_checkmultisig([], log=[], extras=None) + op_checkmultisig(ScriptContext(stack=[], logs=[], extras=Mock())) block = self.genesis_blocks[0] @@ -567,7 +569,7 @@ def test_checkmultisig(self): tx = Transaction(inputs=[txin], outputs=[txout]) data_to_sign = tx.get_sighash_all() - extras = ScriptExtras(tx=tx, txin=None, spent_tx=None) + extras = ScriptExtras(tx=tx, txin=Mock(), spent_tx=Mock()) wallet = HDWallet() wallet._manually_initialize() @@ -596,107 +598,107 @@ def test_checkmultisig(self): stack = [ keys[0]['signature'], keys[2]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(1, stack.pop()) # New set of valid signatures stack = [ keys[0]['signature'], keys[1]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(1, stack.pop()) # Changing the signatures but they match stack = [ keys[1]['signature'], keys[2]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(1, stack.pop()) # Signatures are valid but in wrong order stack = [ keys[1]['signature'], keys[0]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(0, stack.pop()) # Adding wrong signature, so we get error stack = [ keys[0]['signature'], wrong_key['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(0, stack.pop()) # Adding same signature twice, so we get error stack = [ keys[0]['signature'], keys[0]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) self.assertEqual(0, stack.pop()) # Adding less signatures than required, so we get error stack = [keys[0]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3] with self.assertRaises(MissingStackItems): - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) # Quantity of signatures is more than it should stack = [ keys[0]['signature'], keys[1]['signature'], 3, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3 ] with self.assertRaises(MissingStackItems): - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) # Quantity of pubkeys is more than it should stack = [ keys[0]['signature'], keys[1]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 4 ] with self.assertRaises(InvalidStackData): - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) # Exception pubkey_count should be integer stack = [ keys[0]['signature'], keys[1]['signature'], 2, keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], '3' ] with self.assertRaises(InvalidStackData): - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) # Exception not enough pub keys stack = [keys[0]['pubkey'], keys[1]['pubkey'], 3] with self.assertRaises(MissingStackItems): - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) # Exception stack empty after pubkeys stack = [keys[0]['pubkey'], keys[1]['pubkey'], keys[2]['pubkey'], 3] with self.assertRaises(MissingStackItems): - op_checkmultisig(stack, log=[], extras=extras) + op_checkmultisig(ScriptContext(stack=stack, logs=[], extras=extras)) def test_equal(self): elem = b'a' with self.assertRaises(MissingStackItems): - op_equal([elem], log=[], extras=None) + op_equal(ScriptContext(stack=[elem], logs=[], extras=Mock())) # no exception should be raised stack = [elem, elem] - op_equal(stack, log=[], extras=None) + op_equal(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack.pop(), 1) stack = [elem, b'aaaa'] - op_equal(stack, log=[], extras=None) + op_equal(ScriptContext(stack=stack, logs=[], extras=Mock())) self.assertEqual(stack.pop(), 0) def test_integer_opcode(self): # We have opcodes from OP_0 to OP_16 for i in range(0, 17): stack = [] - op_integer(getattr(Opcode, 'OP_{}'.format(i)), stack, [], None) + op_integer(getattr(Opcode, 'OP_{}'.format(i)), stack) self.assertEqual(stack, [i]) stack = [] with self.assertRaises(ScriptError): - op_integer(0, stack, [], None) + op_integer(0, stack) with self.assertRaises(ScriptError): - op_integer(0x61, stack, [], None) + op_integer(0x61, stack) def test_decode_opn(self): for i in range(0, 17):