|
| 1 | +# Copyright 2021 Hathor Labs |
| 2 | +# |
| 3 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +# you may not use this file except in compliance with the License. |
| 5 | +# You may obtain a copy of the License at |
| 6 | +# |
| 7 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +# |
| 9 | +# Unless required by applicable law or agreed to in writing, software |
| 10 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +# See the License for the specific language governing permissions and |
| 13 | +# limitations under the License. |
| 14 | + |
| 15 | +import re |
| 16 | +from typing import TYPE_CHECKING, Any, Generator, NamedTuple, Optional, Pattern, Union |
| 17 | + |
| 18 | +from hathor.conf.get_settings import get_settings |
| 19 | +from hathor.crypto.util import decode_address |
| 20 | +from hathor.transaction.exceptions import OutOfData, ScriptError |
| 21 | +from hathor.transaction.scripts.base_script import BaseScript |
| 22 | + |
| 23 | +if TYPE_CHECKING: |
| 24 | + from hathor.transaction.scripts import P2PKH, MultiSig, Opcode |
| 25 | + |
| 26 | + |
| 27 | +class OpcodePosition(NamedTuple): |
| 28 | + opcode: int |
| 29 | + position: int |
| 30 | + |
| 31 | + |
| 32 | +def re_compile(pattern: str) -> Pattern[bytes]: |
| 33 | + """ Transform a given script pattern into a regular expression. |
| 34 | +
|
| 35 | + The script pattern is like a regular expression, but you may include five |
| 36 | + special symbols: |
| 37 | + (i) OP_DUP, OP_HASH160, and all other opcodes; |
| 38 | + (ii) DATA_<length>: data with the specified length; |
| 39 | + (iii) NUMBER: a 4-byte integer; |
| 40 | + (iv) BLOCK: a variable length block, to be parsed later |
| 41 | +
|
| 42 | + Example: |
| 43 | + >>> r = re_compile( |
| 44 | + ... '^(?:DATA_4 OP_GREATERTHAN_TIMESTAMP)? ' |
| 45 | + ... 'OP_DUP OP_HASH160 (DATA_20) OP_EQUALVERIFY OP_CHECKSIG$' |
| 46 | + ... ) |
| 47 | +
|
| 48 | + :return: A compiled regular expression matcher |
| 49 | + :rtype: :py:class:`re.Pattern` |
| 50 | + """ |
| 51 | + |
| 52 | + def _to_byte_pattern(m): |
| 53 | + x = m.group().decode('ascii').strip() |
| 54 | + if x.startswith('OP_'): |
| 55 | + from hathor.transaction.scripts.opcode import Opcode |
| 56 | + return bytes([Opcode[x]]) |
| 57 | + elif x.startswith('DATA_'): |
| 58 | + length = int(m.group()[5:]) |
| 59 | + return _re_pushdata(length) |
| 60 | + elif x.startswith('NUMBER'): |
| 61 | + return b'.{5}' |
| 62 | + elif x.startswith('BLOCK'): |
| 63 | + return b'.*' |
| 64 | + else: |
| 65 | + raise ValueError('Invalid opcode: {}'.format(x)) |
| 66 | + |
| 67 | + p = pattern.encode('ascii') |
| 68 | + p = re.sub(rb'\s*([A-Z0-9_]+)\s*', _to_byte_pattern, p) |
| 69 | + return re.compile(p, re.DOTALL) |
| 70 | + |
| 71 | + |
| 72 | +def _re_pushdata(length: int) -> bytes: |
| 73 | + """ Create a regular expression that matches a data block with a given length. |
| 74 | +
|
| 75 | + :return: A non-compiled regular expression |
| 76 | + :rtype: bytes |
| 77 | + """ |
| 78 | + from hathor.transaction.scripts.opcode import Opcode |
| 79 | + ret = [bytes([Opcode.OP_PUSHDATA1]), bytes([length]), b'.{', str(length).encode('ascii'), b'}'] |
| 80 | + |
| 81 | + if length <= 75: |
| 82 | + # for now, we accept <= 75 bytes with OP_PUSHDATA1. It's optional |
| 83 | + ret.insert(1, b'?') |
| 84 | + |
| 85 | + return b''.join(ret) |
| 86 | + |
| 87 | + |
| 88 | +def create_base_script(address: str, timelock: Optional[Any] = None) -> BaseScript: |
| 89 | + """ Verifies if address is P2PKH or Multisig and return the corresponding BaseScript implementation. |
| 90 | + """ |
| 91 | + from hathor.transaction.scripts.execute import binary_to_int |
| 92 | + settings = get_settings() |
| 93 | + baddress = decode_address(address) |
| 94 | + if baddress[0] == binary_to_int(settings.P2PKH_VERSION_BYTE): |
| 95 | + from hathor.transaction.scripts import P2PKH |
| 96 | + return P2PKH(address, timelock) |
| 97 | + elif baddress[0] == binary_to_int(settings.MULTISIG_VERSION_BYTE): |
| 98 | + from hathor.transaction.scripts import MultiSig |
| 99 | + return MultiSig(address, timelock) |
| 100 | + else: |
| 101 | + raise ScriptError('The address is not valid') |
| 102 | + |
| 103 | + |
| 104 | +def create_output_script(address: bytes, timelock: Optional[Any] = None) -> bytes: |
| 105 | + """ Verifies if address is P2PKH or Multisig and create correct output script |
| 106 | +
|
| 107 | + :param address: address to send tokens |
| 108 | + :type address: bytes |
| 109 | +
|
| 110 | + :param timelock: timestamp until when the output is locked |
| 111 | + :type timelock: bytes |
| 112 | +
|
| 113 | + :raises ScriptError: if address is not from one of the possible options |
| 114 | +
|
| 115 | + :rtype: bytes |
| 116 | + """ |
| 117 | + from hathor.transaction.scripts.execute import binary_to_int |
| 118 | + settings = get_settings() |
| 119 | + # XXX: if the address class can somehow be simplified create_base_script could be used here |
| 120 | + if address[0] == binary_to_int(settings.P2PKH_VERSION_BYTE): |
| 121 | + from hathor.transaction.scripts import P2PKH |
| 122 | + return P2PKH.create_output_script(address, timelock) |
| 123 | + elif address[0] == binary_to_int(settings.MULTISIG_VERSION_BYTE): |
| 124 | + from hathor.transaction.scripts import MultiSig |
| 125 | + return MultiSig.create_output_script(address, timelock) |
| 126 | + else: |
| 127 | + raise ScriptError('The address is not valid') |
| 128 | + |
| 129 | + |
| 130 | +def parse_address_script(script: bytes) -> Optional[Union['P2PKH', 'MultiSig']]: |
| 131 | + """ Verifies if address is P2PKH or Multisig and calls correct parse_script method |
| 132 | +
|
| 133 | + :param script: script to decode |
| 134 | + :type script: bytes |
| 135 | +
|
| 136 | + :return: P2PKH or MultiSig class or None |
| 137 | + :rtype: class or None |
| 138 | + """ |
| 139 | + from hathor.transaction.scripts import P2PKH, MultiSig |
| 140 | + script_classes: list[type[Union[P2PKH, MultiSig]]] = [P2PKH, MultiSig] |
| 141 | + # Each class verifies its script |
| 142 | + for script_class in script_classes: |
| 143 | + if script_class.re_match.search(script): |
| 144 | + return script_class.parse_script(script) |
| 145 | + return None |
| 146 | + |
| 147 | + |
| 148 | +def get_data_bytes(position: int, length: int, data: bytes) -> bytes: |
| 149 | + """ Extract `length` bytes from `data` starting at `position` |
| 150 | +
|
| 151 | + :param position: start position of bytes string to extract |
| 152 | + :type position: int |
| 153 | +
|
| 154 | + :param length: len of bytes str to extract |
| 155 | + :type length: int |
| 156 | +
|
| 157 | + :param data: script containing data to extract |
| 158 | + :type data: bytes |
| 159 | +
|
| 160 | + :raises OutOfData: when trying to read out of script |
| 161 | +
|
| 162 | + :return: bytes string of extracted data |
| 163 | + :rtype: bytes |
| 164 | + """ |
| 165 | + if not (0 < length <= len(data)): |
| 166 | + raise OutOfData("length ({}) should be from 0 up to data length".format(length)) |
| 167 | + if not (0 < position < len(data)): |
| 168 | + raise OutOfData("position should be inside data") |
| 169 | + if (position+length) > len(data): |
| 170 | + raise OutOfData('trying to read {} bytes starting at {}, available {}'.format(length, position, len(data))) |
| 171 | + return data[position:position+length] |
| 172 | + |
| 173 | + |
| 174 | +def get_data_single_byte(position: int, data: bytes) -> int: |
| 175 | + """ Extract 1 byte from `data` at `position` |
| 176 | +
|
| 177 | + :param position: position of byte to extract |
| 178 | + :type position: int |
| 179 | +
|
| 180 | + :param data: script containing data to extract |
| 181 | + :type data: bytes |
| 182 | +
|
| 183 | + :raises OutOfData: when trying to read out of script |
| 184 | +
|
| 185 | + :return: extracted byte |
| 186 | + :rtype: int |
| 187 | + """ |
| 188 | + if not (0 <= position < len(data)): |
| 189 | + raise OutOfData("trying to read a byte at {} outside of data, available {}".format(position, len(data))) |
| 190 | + return data[position] |
| 191 | + |
| 192 | + |
| 193 | +class _ScriptOperation(NamedTuple): |
| 194 | + opcode: Union['Opcode', int] |
| 195 | + position: int |
| 196 | + data: Union[None, bytes, int, str] |
| 197 | + |
| 198 | + |
| 199 | +def parse_script_ops(data: bytes) -> Generator[_ScriptOperation, None, None]: |
| 200 | + """ Parse script yielding each operation on the script |
| 201 | + this is an utility function to make scripts human readable for debugging and dev |
| 202 | +
|
| 203 | + :param data: script to parse that contains data and opcodes |
| 204 | + :type data: bytes |
| 205 | +
|
| 206 | + :return: generator for operations on script |
| 207 | + :rtype: Generator[_ScriptOperation, None, None] |
| 208 | + """ |
| 209 | + from hathor.transaction.scripts import Opcode |
| 210 | + from hathor.transaction.scripts.execute import Stack, get_script_op |
| 211 | + op: Union[Opcode, int] |
| 212 | + |
| 213 | + pos = 0 |
| 214 | + last_pos = 0 |
| 215 | + data_len = len(data) |
| 216 | + stack: Stack = [] |
| 217 | + while pos < data_len: |
| 218 | + last_pos = pos |
| 219 | + opcode, pos = get_script_op(pos, data, stack) |
| 220 | + try: |
| 221 | + op = Opcode(opcode) |
| 222 | + except ValueError: |
| 223 | + op = opcode |
| 224 | + if len(stack) != 0: |
| 225 | + yield _ScriptOperation(opcode=op, position=last_pos, data=stack.pop()) |
| 226 | + else: |
| 227 | + yield _ScriptOperation(opcode=op, position=last_pos, data=None) |
| 228 | + |
| 229 | + |
| 230 | +def count_sigops(data: bytes) -> int: |
| 231 | + """ Count number of signature operations on the script |
| 232 | +
|
| 233 | + :param data: script to parse that contains data and opcodes |
| 234 | + :type data: bytes |
| 235 | +
|
| 236 | + :raises OutOfData: when trying to read out of script |
| 237 | + :raises InvalidScriptError: when an invalid opcode is found |
| 238 | + :raises InvalidScriptError: when the previous opcode to an |
| 239 | + OP_CHECKMULTISIG is not an integer (number of operations to execute) |
| 240 | +
|
| 241 | + :return: number of signature operations the script would do if it was executed |
| 242 | + :rtype: int |
| 243 | + """ |
| 244 | + from hathor.transaction.scripts import Opcode |
| 245 | + from hathor.transaction.scripts.execute import decode_opn, get_script_op |
| 246 | + settings = get_settings() |
| 247 | + n_ops: int = 0 |
| 248 | + data_len: int = len(data) |
| 249 | + pos: int = 0 |
| 250 | + last_opcode: Union[int, None] = None |
| 251 | + |
| 252 | + while pos < data_len: |
| 253 | + opcode, pos = get_script_op(pos, data) |
| 254 | + |
| 255 | + if opcode == Opcode.OP_CHECKSIG: |
| 256 | + n_ops += 1 |
| 257 | + elif opcode == Opcode.OP_CHECKMULTISIG: |
| 258 | + assert isinstance(last_opcode, int) |
| 259 | + if Opcode.OP_0 <= last_opcode <= Opcode.OP_16: |
| 260 | + # Conventional OP_CHECKMULTISIG: <sign_1>...<sign_m> <m> <pubkey_1>...<pubkey_n> <n> <checkmultisig> |
| 261 | + # this function will run op_checksig with each pair (sign_x, pubkey_y) until all signatures |
| 262 | + # are verified so the worst case scenario is n op_checksig and the best m op_checksig |
| 263 | + # we know m <= n, so for now we are counting n operations (the upper limit) |
| 264 | + n_ops += decode_opn(last_opcode) |
| 265 | + else: |
| 266 | + # Unconventional OP_CHECKMULTISIG: |
| 267 | + # We count the limit for PUBKEYS, since this is also the upper limit on signature operations |
| 268 | + # that any op_checkmultisig would run |
| 269 | + n_ops += settings.MAX_MULTISIG_PUBKEYS |
| 270 | + last_opcode = opcode |
| 271 | + return n_ops |
| 272 | + |
| 273 | + |
| 274 | +def get_sigops_count(data: bytes, output_script: Optional[bytes] = None) -> int: |
| 275 | + """ Count number of signature operations on the script, if it's an input script and the spent output is passed |
| 276 | + check the spent output for MultiSig and count operations on redeem_script too |
| 277 | +
|
| 278 | + :param data: script to parse with opcodes |
| 279 | + :type data: bytes |
| 280 | +
|
| 281 | + :param output_script: spent output script if data was from an TxIn |
| 282 | + :type output_script: Union[None, bytes] |
| 283 | +
|
| 284 | + :raises OutOfData: when trying to read out of script |
| 285 | + :raises InvalidScriptError: when an invalid opcode is found |
| 286 | +
|
| 287 | + :return: number of signature operations the script would do if it was executed |
| 288 | + :rtype: int |
| 289 | + """ |
| 290 | + # If validating an input, should check the spent_tx for MultiSig |
| 291 | + if output_script is not None: |
| 292 | + # If it's multisig we have to validate the redeem_script sigop count |
| 293 | + from hathor.transaction.scripts import MultiSig |
| 294 | + if MultiSig.re_match.search(output_script): |
| 295 | + multisig_data = MultiSig.get_multisig_data(data) |
| 296 | + # input_script + redeem_script |
| 297 | + return count_sigops(multisig_data) |
| 298 | + |
| 299 | + return count_sigops(data) |
| 300 | + |
| 301 | + |
| 302 | +def get_pushdata(data: bytes) -> bytes: |
| 303 | + if data[0] > 75: |
| 304 | + length = data[1] |
| 305 | + start = 2 |
| 306 | + else: |
| 307 | + length = data[0] |
| 308 | + start = 1 |
| 309 | + return data[start:(start + length)] |
0 commit comments