diff --git a/rapid_pro_tools/rapid_pro_client.py b/rapid_pro_tools/rapid_pro_client.py index ae4a76a3..d6e66b4d 100755 --- a/rapid_pro_tools/rapid_pro_client.py +++ b/rapid_pro_tools/rapid_pro_client.py @@ -253,7 +253,7 @@ def get_raw_messages(self, created_after_inclusive=None, created_before_exclusiv return raw_messages - def send_message_to_urn(self, message, target_urn): + def send_message_to_urn(self, message, target_urn, interrupt=False): """ Sends a message to the given URN. @@ -261,11 +261,16 @@ def send_message_to_urn(self, message, target_urn): :type message: str :param target_urn: URN to send the message to. :type target_urn: str + :param interrupt: Whether to interrupt the target_urn from flows before sending the message. + :type interrupt: bool :return: Id of the Rapid Pro broadcast created for this send request. This id may be used to check on the status of the broadcast by making further requests to Rapid Pro. Note that this is a broadcast (to one person) because Rapid Pro does not support unicasting. :rtype: int """ + if interrupt: + self.interrupt_urns([target_urn]) + log.info("Sending a message to an individual...") log.debug(f"Sending to '{target_urn}' the message '{message}'...") response: Broadcast = self.rapid_pro.create_broadcast(message, urns=[target_urn]) @@ -286,6 +291,31 @@ def get_broadcast_for_broadcast_id(self, broadcast_id): f"(expected exactly 1)" return matching_broadcasts[0] + def interrupt_urns(self, urns): + """ + Interrupts the given URNs from the flows they are currently in, if any. + + If the list of URNs contains more than 100 items, requests will be made in batches of 100 URNs at a time. + + :param urns: URNs to interrupt + :type urns: list of str + """ + log.info(f"Interrupting {len(urns)} URNs...") + log.debug(f"Interrupting {urns}...") + batch = [] + interrupted = 0 + for urn in urns: + batch.append(urn) + if len(batch) >= 100: # limit of 100 imposed by Rapid Pro's API + self.rapid_pro.bulk_interrupt_contacts(batch) + interrupted += len(batch) + log.info(f"Interrupted {interrupted} / {len(urns)} URNs") + batch = [] + if len(batch) > 0: + self.rapid_pro.bulk_interrupt_contacts(batch) + interrupted += len(batch) + log.info(f"Interrupted {interrupted} / {len(urns)} URNs") + def _get_archived_runs_for_flow_id(self, flow_id, last_modified_after_inclusive=None, last_modified_before_exclusive=None): """