1
1
from __future__ import annotations
2
2
3
3
import json
4
+ from copy import deepcopy
4
5
from datetime import date
5
6
from decimal import Decimal
6
7
from unittest import TestCase
33
34
}
34
35
35
36
37
+ class PaymentQuerySet (Mock ):
38
+ __payments = {}
39
+
40
+ def create (self , ** kwargs ):
41
+ if kwargs :
42
+ raise NotImplementedError (f"arguments not supported yet: { kwargs } " )
43
+ id_ = max (self .__payments ) + 1 if self .__payments else 1
44
+ self .__payments [id_ ] = {}
45
+ payment = Payment ()
46
+ payment .id = id_
47
+ payment .save ()
48
+ return payment
49
+
50
+ def get (self , * args , ** kwargs ):
51
+ if args or kwargs :
52
+ return self .filter (* args , ** kwargs ).get ()
53
+ payment = Payment ()
54
+ payment_fields , = self .__payments .values ()
55
+ for payment_field_name , payment_field_value in payment_fields .items ():
56
+ setattr (payment , payment_field_name , deepcopy (payment_field_value ))
57
+ return payment
58
+
59
+ def filter (self , * args , pk = None , ** kwargs ):
60
+ if args or kwargs :
61
+ raise NotImplementedError (f"arguments not supported yet: { args } , { kwargs } " )
62
+ if pk is not None :
63
+ return PaymentQuerySet ({pk_ : payment for pk_ , payment in self .__payments .items () if pk_ == pk })
64
+ return self
65
+
66
+ def update (self , ** kwargs ):
67
+ for payment in self .__payments .values ():
68
+ for field_name , field_value in kwargs .items ():
69
+ if not any (field .name == field_name for field in Payment ._meta .get_fields (include_parents = True , include_hidden = True )):
70
+ raise NotImplementedError (f"updating unknown field not supported yet: { field_name } " )
71
+ payment [field_name ] = deepcopy (field_value )
72
+
73
+ def delete (self ):
74
+ self .__payments .clear ()
75
+
76
+
36
77
class Payment (Mock ):
78
+ objects = PaymentQuerySet ()
79
+
37
80
id = 1
38
81
description = "payment"
39
82
currency = "USD"
@@ -57,9 +100,14 @@ class Payment(Mock):
57
100
}
58
101
)
59
102
103
+ @property
104
+ def pk (self ):
105
+ return self .id
106
+
60
107
def change_status (self , status , message = "" ):
61
108
self .status = status
62
109
self .message = message
110
+ self .save (update_fields = ["status" , "message" ])
63
111
64
112
def get_failure_url (self ):
65
113
return "http://cancel.com"
@@ -77,10 +125,37 @@ def get_purchased_items(self):
77
125
def get_success_url (self ):
78
126
return "http://success.com"
79
127
128
+ def save (self , * args , update_fields = None , ** kwargs ):
129
+ if args or kwargs :
130
+ raise NotImplementedError (f"arguments not supported yet: { args } , { kwargs } " )
131
+ if update_fields is None :
132
+ update_fields = {field .name for field in self ._meta .get_fields (include_parents = True , include_hidden = True )}
133
+ Payment .objects .filter (pk = self .pk ).update (** {field : getattr (self , field ) for field in update_fields })
134
+
135
+ def refresh_from_db (self , * args , ** kwargs ):
136
+ if args or kwargs :
137
+ raise NotImplementedError (f"arguments not supported yet: { args } , { kwargs } " )
138
+ payment_from_db = Payment .objects .get (pk = self .pk )
139
+ for field in self ._meta .get_fields (include_parents = True , include_hidden = True ):
140
+ field_value_from_db = getattr (payment_from_db , field .name )
141
+ setattr (self , field .name , field_value_from_db )
142
+
143
+ class Meta (Mock ):
144
+ def get_fields (self , include_parents = True , include_hidden = False ):
145
+ fields = []
146
+ for field_name in {"id" , "description" , "currency" , "delivery" , "status" , "tax" , "token" , "total" , "captured_amount" , "variant" , "transaction_id" , "message" , "extra_data" }:
147
+ field = Mock ()
148
+ field .name = field_name
149
+ fields .append (field )
150
+ return tuple (fields )
151
+
152
+ _meta = Meta ()
153
+
80
154
81
155
class TestPaypalProvider (TestCase ):
82
156
def setUp (self ):
83
- self .payment = Payment ()
157
+ Payment .objects .delete ()
158
+ self .payment = Payment .objects .create ()
84
159
self .provider = PaypalProvider (secret = SECRET , client_id = CLIENT_ID )
85
160
86
161
def test_provider_raises_redirect_needed_on_success (self ):
@@ -171,6 +246,9 @@ def test_provider_redirects_on_success_captured_payment(
171
246
172
247
self .assertEqual (self .payment .status , PaymentStatus .CONFIRMED )
173
248
self .assertEqual (self .payment .captured_amount , self .payment .total )
249
+ self .payment .refresh_from_db ()
250
+ self .assertEqual (self .payment .status , PaymentStatus .CONFIRMED )
251
+ self .assertEqual (self .payment .captured_amount , self .payment .total )
174
252
175
253
@patch ("requests.post" )
176
254
@patch ("payments.paypal.redirect" )
@@ -202,6 +280,9 @@ def test_provider_redirects_on_success_preauth_payment(
202
280
203
281
self .assertEqual (self .payment .status , PaymentStatus .PREAUTH )
204
282
self .assertEqual (self .payment .captured_amount , Decimal ("0" ))
283
+ self .payment .refresh_from_db ()
284
+ self .assertEqual (self .payment .status , PaymentStatus .PREAUTH )
285
+ self .assertEqual (self .payment .captured_amount , Decimal ("0" ))
205
286
206
287
@patch ("payments.paypal.redirect" )
207
288
def test_provider_request_without_payerid_redirects_on_failure (
@@ -211,6 +292,8 @@ def test_provider_request_without_payerid_redirects_on_failure(
211
292
request .GET = {"token" : "test" , "PayerID" : None }
212
293
self .provider .process_data (self .payment , request )
213
294
self .assertEqual (self .payment .status , PaymentStatus .REJECTED )
295
+ self .payment .refresh_from_db ()
296
+ self .assertEqual (self .payment .status , PaymentStatus .REJECTED )
214
297
215
298
@patch ("requests.post" )
216
299
def test_provider_renews_access_token (self , mocked_post ):
0 commit comments