10
10
11
11
"""Observability tools to spit out analysis-ready tables, one row per test case."""
12
12
13
+ import base64
14
+ import dataclasses
13
15
import json
16
+ import math
14
17
import os
15
18
import sys
16
19
import time
20
23
from dataclasses import dataclass
21
24
from datetime import date , timedelta
22
25
from functools import lru_cache
23
- from typing import TYPE_CHECKING , Any , Callable , Literal , Optional , Union
26
+ from typing import TYPE_CHECKING , Any , Callable , Literal , Optional , Union , cast
24
27
25
28
from hypothesis .configuration import storage_directory
26
29
from hypothesis .errors import HypothesisWarning
30
+ from hypothesis .internal .conjecture .choice import (
31
+ BooleanConstraints ,
32
+ BytesConstraints ,
33
+ ChoiceConstraintsT ,
34
+ ChoiceNode ,
35
+ ChoiceT ,
36
+ ChoiceTypeT ,
37
+ FloatConstraints ,
38
+ IntegerConstraints ,
39
+ StringConstraints ,
40
+ )
41
+ from hypothesis .internal .escalation import InterestingOrigin
42
+ from hypothesis .internal .floats import float_to_int
43
+ from hypothesis .internal .intervalsets import IntervalSet
27
44
28
45
if TYPE_CHECKING :
29
46
from typing import TypeAlias
@@ -43,6 +60,89 @@ def update_count(self, *, condition: bool) -> None:
43
60
self .unsatisfied += 1
44
61
45
62
63
+ def _choice_to_json (choice : Union [ChoiceT , None ]) -> Any :
64
+ if choice is None :
65
+ return None
66
+ # see the note on the same check in to_jsonable for why we cast large
67
+ # integers to floats.
68
+ if isinstance (choice , int ) and not isinstance (choice , bool ) and choice > 2 ** 63 :
69
+ return ["integer" , float (choice )]
70
+ elif isinstance (choice , bytes ):
71
+ return ["bytes" , base64 .b64encode (choice ).decode ()]
72
+ elif isinstance (choice , float ) and math .isnan (choice ):
73
+ # handle nonstandard nan bit patterns. We don't need to do this for -0.0
74
+ # vs 0.0 since json doesn't normalize -0.0 to 0.0.
75
+ return ["float" , float_to_int (choice )]
76
+ return choice
77
+
78
+
79
+ def choices_to_json (choices : tuple [ChoiceT , ...]) -> list [Any ]:
80
+ return [_choice_to_json (choice ) for choice in choices ]
81
+
82
+
83
+ def _constraints_to_json (
84
+ choice_type : ChoiceTypeT , constraints : ChoiceConstraintsT
85
+ ) -> dict [str , Any ]:
86
+ constraints = constraints .copy ()
87
+ if choice_type == "integer" :
88
+ constraints = cast (IntegerConstraints , constraints )
89
+ return {
90
+ "min_value" : _choice_to_json (constraints ["min_value" ]),
91
+ "max_value" : _choice_to_json (constraints ["max_value" ]),
92
+ "weights" : (
93
+ None
94
+ if constraints ["weights" ] is None
95
+ # wrap up in a list, instead of a dict, because json dicts
96
+ # require string keys
97
+ else [
98
+ (_choice_to_json (k ), v ) for k , v in constraints ["weights" ].items ()
99
+ ]
100
+ ),
101
+ "shrink_towards" : _choice_to_json (constraints ["shrink_towards" ]),
102
+ }
103
+ elif choice_type == "float" :
104
+ constraints = cast (FloatConstraints , constraints )
105
+ return {
106
+ "min_value" : _choice_to_json (constraints ["min_value" ]),
107
+ "max_value" : _choice_to_json (constraints ["max_value" ]),
108
+ "allow_nan" : constraints ["allow_nan" ],
109
+ "smallest_nonzero_magnitude" : constraints ["smallest_nonzero_magnitude" ],
110
+ }
111
+ elif choice_type == "string" :
112
+ constraints = cast (StringConstraints , constraints )
113
+ assert isinstance (constraints ["intervals" ], IntervalSet )
114
+ return {
115
+ "intervals" : constraints ["intervals" ].intervals ,
116
+ "min_size" : _choice_to_json (constraints ["min_size" ]),
117
+ "max_size" : _choice_to_json (constraints ["max_size" ]),
118
+ }
119
+ elif choice_type == "bytes" :
120
+ constraints = cast (BytesConstraints , constraints )
121
+ return {
122
+ "min_size" : _choice_to_json (constraints ["min_size" ]),
123
+ "max_size" : _choice_to_json (constraints ["max_size" ]),
124
+ }
125
+ elif choice_type == "boolean" :
126
+ constraints = cast (BooleanConstraints , constraints )
127
+ return {
128
+ "p" : constraints ["p" ],
129
+ }
130
+ else :
131
+ raise NotImplementedError (f"unknown choice type { choice_type } " )
132
+
133
+
134
+ def nodes_to_json (nodes : tuple [ChoiceNode , ...]) -> list [dict [str , Any ]]:
135
+ return [
136
+ {
137
+ "type" : node .type ,
138
+ "value" : _choice_to_json (node .value ),
139
+ "constraints" : _constraints_to_json (node .type , node .constraints ),
140
+ "was_forced" : node .was_forced ,
141
+ }
142
+ for node in nodes
143
+ ]
144
+
145
+
46
146
@dataclass
47
147
class ObservationMetadata :
48
148
traceback : Optional [str ]
@@ -52,6 +152,28 @@ class ObservationMetadata:
52
152
sys_argv : list [str ]
53
153
os_getpid : int
54
154
imported_at : float
155
+ data_status : "Status"
156
+ interesting_origin : Optional [InterestingOrigin ]
157
+ choice_nodes : Optional [tuple [ChoiceNode , ...]]
158
+
159
+ def to_json (self ) -> dict [str , Any ]:
160
+ data = {
161
+ "traceback" : self .traceback ,
162
+ "reproduction_decorator" : self .reproduction_decorator ,
163
+ "predicates" : self .predicates ,
164
+ "backend" : self .backend ,
165
+ "sys.argv" : self .sys_argv ,
166
+ "os.getpid()" : self .os_getpid ,
167
+ "imported_at" : self .imported_at ,
168
+ "data_status" : self .data_status ,
169
+ "interesting_origin" : self .interesting_origin ,
170
+ "choice_nodes" : (
171
+ None if self .choice_nodes is None else nodes_to_json (self .choice_nodes )
172
+ ),
173
+ }
174
+ # check that we didn't forget one
175
+ assert len (data ) == len (dataclasses .fields (self ))
176
+ return data
55
177
56
178
57
179
@dataclass
@@ -183,6 +305,9 @@ def make_testcase(
183
305
),
184
306
"predicates" : dict (data ._observability_predicates ),
185
307
"backend" : backend_metadata or {},
308
+ "data_status" : data .status ,
309
+ "interesting_origin" : data .interesting_origin ,
310
+ "choice_nodes" : data .nodes if OBSERVABILITY_CHOICE_NODES else None ,
186
311
** _system_metadata (),
187
312
# unpack last so it takes precedence for duplicate keys
188
313
** (metadata or {}),
@@ -204,11 +329,7 @@ def _deliver_to_file(observation: Observation) -> None: # pragma: no cover
204
329
fname .parent .mkdir (exist_ok = True , parents = True )
205
330
_WROTE_TO .add (fname )
206
331
with fname .open (mode = "a" ) as f :
207
- obs_json : dict [str , Any ] = to_jsonable (observation , avoid_realization = False ) # type: ignore
208
- if obs_json ["type" ] == "test_case" :
209
- obs_json ["metadata" ]["sys.argv" ] = obs_json ["metadata" ].pop ("sys_argv" )
210
- obs_json ["metadata" ]["os.getpid()" ] = obs_json ["metadata" ].pop ("os_getpid" )
211
- f .write (json .dumps (obs_json ) + "\n " )
332
+ f .write (json .dumps (to_jsonable (observation , avoid_realization = False )) + "\n " )
212
333
213
334
214
335
_imported_at = time .time ()
@@ -231,6 +352,10 @@ def _system_metadata() -> dict[str, Any]:
231
352
OBSERVABILITY_COLLECT_COVERAGE = (
232
353
"HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_NOCOVER" not in os .environ
233
354
)
355
+ OBSERVABILITY_CHOICE_NODES = (
356
+ "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY_CHOICE_NODES" in os .environ
357
+ )
358
+
234
359
if OBSERVABILITY_COLLECT_COVERAGE is False and (
235
360
sys .version_info [:2 ] >= (3 , 12 )
236
361
): # pragma: no cover
@@ -240,8 +365,10 @@ def _system_metadata() -> dict[str, Any]:
240
365
HypothesisWarning ,
241
366
stacklevel = 2 ,
242
367
)
243
- if "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os .environ or (
244
- OBSERVABILITY_COLLECT_COVERAGE is False
368
+
369
+ if (
370
+ "HYPOTHESIS_EXPERIMENTAL_OBSERVABILITY" in os .environ
371
+ or OBSERVABILITY_COLLECT_COVERAGE is False
245
372
): # pragma: no cover
246
373
TESTCASE_CALLBACKS .append (_deliver_to_file )
247
374
0 commit comments