Skip to content

Commit 08210a1

Browse files
authored
Merge pull request #657 from praekeltfoundation/resend-failed-templates
Resend failed template script
2 parents df84c58 + f6f59c3 commit 08210a1

File tree

2 files changed

+142
-0
lines changed

2 files changed

+142
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
contact_id,template,message_rank
2+
2783123,"{u'language': {u'policy': u'deterministic', u'code': u'en'}, u'namespace': u'test-namespace', u'name': u'baby_switch_confirmation_v3', u'components': [{u'type': u'body', u'parameters': null}]}",1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import asyncio
2+
import csv
3+
import json
4+
import os
5+
import sys
6+
import time
7+
from datetime import datetime, timedelta
8+
from urllib.parse import urljoin
9+
10+
import aiohttp
11+
12+
"""
13+
Guide:
14+
1. Run query below to get csv, It gets the last failed template send per contact since
15+
31 Jan when the payment issue started.
16+
2. Run script:
17+
`python resend_template_failures.py example.csv > output.json`
18+
3. Check output:
19+
`jq .response.status output.json | sort | uniq -c`
20+
4. Retry:
21+
`cat output.json|python scripts/migrate_to_rapidpro/retry_requests.py > output2.json`
22+
5. Repeat last two steps until there are only 200s returned by turn.
23+
"""
24+
25+
WORKER_COUNT = 3
26+
27+
TURN_URL = "https://whatsapp-praekelt-cloud.turn.io"
28+
29+
30+
async def send_whatsapp_template(session, wa_id, template_data, target):
31+
url = urljoin(TURN_URL, "v1/messages")
32+
headers = {
33+
"Authorization": f"Bearer {os.environ['TURN_TOKEN']}",
34+
"content-type": "application/json",
35+
}
36+
data = {"to": wa_id, "type": "template", "template": template_data}
37+
status, reset_time = await request(session, url, "POST", headers, data, target)
38+
39+
if status == 429:
40+
sleep_until(reset_time)
41+
await send_whatsapp_template(session, wa_id, data, target)
42+
43+
44+
def sleep_until(reset_time):
45+
target = datetime.fromtimestamp(int(str(reset_time).split(".")[0]))
46+
delta = target - datetime.now()
47+
if delta > timedelta(0):
48+
time.sleep(delta.total_seconds())
49+
return True
50+
51+
52+
async def request(session, url, method, headers, data, target):
53+
func = getattr(session, method.lower())
54+
async with func(url, headers=headers, json=data) as response:
55+
response_body = await response.text()
56+
57+
if response.status == 429:
58+
return response.status, response.headers["x-ratelimit-reset"]
59+
60+
request_data = {
61+
"request": {
62+
"url": response.request_info.url.human_repr(),
63+
"method": response.request_info.method,
64+
"headers": dict(response.request_info.headers),
65+
"json": data,
66+
},
67+
"response": {
68+
"status": response.status,
69+
"headers": dict(response.headers),
70+
"body": response_body,
71+
},
72+
}
73+
74+
target.write(json.dumps(request_data))
75+
target.write("\n")
76+
77+
return response.status, None
78+
79+
80+
async def worker(name, queue):
81+
while True:
82+
session, wa_id, template_data, target = await queue.get()
83+
await send_whatsapp_template(session, wa_id, template_data, target)
84+
queue.task_done()
85+
86+
87+
async def main(filename, target):
88+
queue = asyncio.Queue(WORKER_COUNT)
89+
90+
reader = csv.DictReader(open(filename))
91+
async with aiohttp.ClientSession() as session:
92+
tasks = []
93+
for i in range(WORKER_COUNT):
94+
task = asyncio.create_task(worker(f"worker-{i}", queue))
95+
tasks.append(task)
96+
97+
for row in reader:
98+
wa_id = row["contact_id"]
99+
template_data = json.loads(
100+
row["template"]
101+
.replace("'", '"')
102+
.replace('u"', '"')
103+
.replace("None", "null")
104+
)
105+
update = (session, wa_id, template_data, target)
106+
await queue.put(update)
107+
108+
await queue.join()
109+
for task in tasks:
110+
task.cancel()
111+
112+
await asyncio.gather(*tasks, return_exceptions=True)
113+
114+
115+
if __name__ == "__main__":
116+
filename = sys.argv[1]
117+
asyncio.run(main(filename, sys.stdout))
118+
119+
"""
120+
Query for CSV(seed hub/eventstore):
121+
===================================
122+
SELECT *
123+
FROM
124+
(
125+
SELECT
126+
eventstore_message.contact_id,
127+
eventstore_message.data->'template' as template,
128+
row_number() OVER (PARTITION BY eventstore_message.contact_id
129+
ORDER BY eventstore_message.timestamp DESC) AS message_rank
130+
FROM eventstore_message,
131+
eventstore_event
132+
WHERE eventstore_message.TIMESTAMP >= '2025-01-31'
133+
and eventstore_message.TYPE = 'template'
134+
AND eventstore_message.id = eventstore_event.message_id
135+
AND eventstore_event.status = 'failed'
136+
AND (eventstore_event.data->'errors'->>0)::json->>'code' = '131042'
137+
ORDER BY eventstore_message.timestamp ASC
138+
) AS all_template_sends
139+
WHERE message_rank = 1
140+
"""

0 commit comments

Comments
 (0)