Skip to content

Callbacks when a stanza is ack'd or fails to send #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/auth.c
Original file line number Diff line number Diff line change
Expand Up @@ -1359,13 +1359,18 @@ static int _get_h_attribute(xmpp_stanza_t *stanza, unsigned long *ul_h)
return 0;
}

static void _sm_queue_cleanup(xmpp_conn_t *conn, unsigned long ul_h)
static void _sm_queue_cleanup(xmpp_conn_t *conn, unsigned long ul_h, int failed)
{
xmpp_send_queue_t *e;
while ((e = peek_queue_front(&conn->sm_state->sm_queue))) {
if (e->sm_h >= ul_h)
break;
e = pop_queue_front(&conn->sm_state->sm_queue);
if (failed && conn->sm_fail_callback && e->id) {
conn->sm_fail_callback(conn, conn->sm_fail_callback_ctx, e->id);
} else if (conn->sm_ack_callback && e->id) {
conn->sm_ack_callback(conn, conn->sm_ack_callback_ctx, e->id);
}
strophe_free(conn->ctx, queue_element_free(conn->ctx, e));
}
}
Expand Down Expand Up @@ -1441,7 +1446,7 @@ static int _handle_sm(xmpp_conn_t *const conn,
conn->sm_state->sm_sent_nr = conn->sm_state->sm_queue.head->sm_h;
else
conn->sm_state->sm_sent_nr = ul_h;
_sm_queue_cleanup(conn, ul_h);
_sm_queue_cleanup(conn, ul_h, 0);
_sm_queue_resend(conn);
strophe_debug(conn->ctx, "xmpp", "Session resumed successfully.");
_stream_negotiation_success(conn);
Expand All @@ -1468,7 +1473,7 @@ static int _handle_sm(xmpp_conn_t *const conn,
/* In cases there's no `h` included, drop all elements. */
ul_h = (unsigned long)-1;
}
_sm_queue_cleanup(conn, ul_h);
_sm_queue_cleanup(conn, ul_h, 1);
}
} else if (!strcmp(cause, "feature-not-implemented")) {
conn->sm_state->resume = 0;
Expand Down
5 changes: 5 additions & 0 deletions src/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ struct _xmpp_send_queue_t {
xmpp_send_queue_owner_t owner;
void *userdata;
uint32_t sm_h;
char *id;

xmpp_send_queue_t *prev, *next;
};
Expand Down Expand Up @@ -330,6 +331,10 @@ struct _xmpp_conn_t {
xmpp_sockopt_callback sockopt_cb;
xmpp_sm_callback sm_callback;
void *sm_callback_ctx;
xmpp_sm_ack_callback sm_ack_callback;
void *sm_ack_callback_ctx;
xmpp_sm_ack_callback sm_fail_callback;
void *sm_fail_callback_ctx;
};

void conn_disconnect(xmpp_conn_t *conn);
Expand Down
59 changes: 55 additions & 4 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* A part of those functions is listed under the \ref TLS section.
*/

#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdarg.h>
Expand Down Expand Up @@ -115,6 +116,7 @@ static void _send_valist(xmpp_conn_t *conn,
static int _send_raw(xmpp_conn_t *conn,
char *data,
size_t len,
const char *id,
xmpp_send_queue_owner_t owner,
void *userdata);

Expand Down Expand Up @@ -1279,6 +1281,22 @@ void xmpp_conn_set_sm_callback(xmpp_conn_t *conn,
conn->sm_callback_ctx = ctx;
}

void xmpp_conn_set_sm_ack_callback(xmpp_conn_t *conn,
xmpp_sm_ack_callback cb,
void *ctx)
{
conn->sm_ack_callback = cb;
conn->sm_ack_callback_ctx = ctx;
}

void xmpp_conn_set_sm_fail_callback(xmpp_conn_t *conn,
xmpp_sm_ack_callback cb,
void *ctx)
{
conn->sm_fail_callback = cb;
conn->sm_fail_callback_ctx = ctx;
}

struct sm_restore {
xmpp_conn_t *conn;
const unsigned char *state;
Expand Down Expand Up @@ -1324,7 +1342,8 @@ static int sm_load_string(struct sm_restore *sm, char **val, size_t *len)
memcpy(*val, sm->state, l);
(*val)[l] = '\0';
sm->state += l;
*len = l;
if (len)
*len = l;
return 0;
}

Expand Down Expand Up @@ -1454,10 +1473,17 @@ int xmpp_conn_restore_sm_state(xmpp_conn_t *conn,
ret = sm_load_string(&sm, &item->data, &item->len);
if (ret)
goto err_reload;
if (sm.state < sm.state_end) {
ret = sm_load_string(&sm, &item->id, NULL);
if (ret)
goto err_reload;
}

item->owner = XMPP_QUEUE_USER;
}

assert(sm.state == sm.state_end);

return XMPP_EOK;

err_reload:
Expand Down Expand Up @@ -1496,6 +1522,7 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf)
while (peek) {
sm_queue_len++;
sm_queue_size += 10 + peek->len;
if (peek->id) sm_queue_size += 5 + strlen(peek->id);
peek = peek->next;
}

Expand All @@ -1505,6 +1532,7 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf)
while (peek) {
send_queue_len++;
send_queue_size += 5 + peek->len;
if (peek->id) send_queue_size += 5 + strlen(peek->id);
peek = peek->next;
}

Expand Down Expand Up @@ -1563,6 +1591,17 @@ static size_t sm_state_serialize(xmpp_conn_t *conn, unsigned char **buf)
goto err_serialize;
memcpy(next, peek->data, peek->len);
next += peek->len;

if (peek->id) {
uint32_t len = strlen(peek->id);
if (sm_store_u32(&next, end, 0x7a, len))
goto err_serialize;
if (next + len > end)
goto err_serialize;
memcpy(next, peek->id, len);
next += len;
}

peek = peek->next;
}

Expand Down Expand Up @@ -1813,6 +1852,10 @@ char *xmpp_conn_send_queue_drop_element(xmpp_conn_t *conn,
if (!t)
return NULL;

if (conn->sm_ack_callback && t->id) {
conn->sm_ack_callback(conn, conn->sm_ack_callback_ctx, t->id);
}

/* In case there exists a SM stanza that is linked to the
* one we're currently dropping, also delete that one.
*/
Expand Down Expand Up @@ -2088,6 +2131,10 @@ static void _conn_sm_handle_stanza(xmpp_conn_t *const conn,
e = pop_queue_front(&conn->sm_state->sm_queue);
strophe_debug_verbose(2, conn->ctx, "conn",
"SM_Q_DROP: %p, h=%lu", e, e->sm_h);
if (conn->sm_ack_callback && e->id) {
conn->sm_ack_callback(conn, conn->sm_ack_callback_ctx,
e->id);
}
c = queue_element_free(conn->ctx, e);
strophe_free(conn->ctx, c);
}
Expand Down Expand Up @@ -2115,6 +2162,7 @@ char *queue_element_free(xmpp_ctx_t *ctx, xmpp_send_queue_t *e)
{
char *ret = e->data;
strophe_debug_verbose(2, ctx, "conn", "Q_FREE: %p", e);
strophe_free(ctx, e->id);
memset(e, 0, sizeof(*e));
strophe_free(ctx, e);
strophe_debug_verbose(3, ctx, "conn", "Q_CONTENT: %s", ret);
Expand Down Expand Up @@ -2231,7 +2279,7 @@ void send_raw(xmpp_conn_t *conn,
return;
}

_send_raw(conn, d, len, owner, userdata);
_send_raw(conn, d, len, NULL, owner, userdata);
}

static void _send_valist(xmpp_conn_t *conn,
Expand Down Expand Up @@ -2266,7 +2314,7 @@ static void _send_valist(xmpp_conn_t *conn,
va_end(apdup);

/* len - 1 so we don't send trailing \0 */
_send_raw(conn, bigbuf, len - 1, owner, NULL);
_send_raw(conn, bigbuf, len - 1, NULL, owner, NULL);
} else {
/* go through send_raw() which does the strdup() for us */
send_raw(conn, buf, len, owner, NULL);
Expand Down Expand Up @@ -2300,7 +2348,8 @@ void send_stanza(xmpp_conn_t *conn,
goto out;
}

_send_raw(conn, buf, len, owner, NULL);
_send_raw(conn, buf, len, xmpp_stanza_get_attribute(stanza, "id"), owner,
NULL);
out:
xmpp_stanza_release(stanza);
}
Expand Down Expand Up @@ -2342,6 +2391,7 @@ xmpp_send_queue_t *pop_queue_front(xmpp_queue_t *queue)
static int _send_raw(xmpp_conn_t *conn,
char *data,
size_t len,
const char *id,
xmpp_send_queue_owner_t owner,
void *userdata)
{
Expand All @@ -2358,6 +2408,7 @@ static int _send_raw(xmpp_conn_t *conn,

item->data = data;
item->len = len;
item->id = id ? strophe_strdup(conn->ctx, id) : NULL;
item->next = NULL;
item->prev = conn->send_queue_tail;
item->written = 0;
Expand Down
10 changes: 10 additions & 0 deletions strophe.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,16 @@ int xmpp_conn_restore_sm_state(xmpp_conn_t *conn,
const unsigned char *sm_state,
size_t sm_state_len);

typedef void (*xmpp_sm_ack_callback)(xmpp_conn_t *conn,
void *ctx,
const char *id);
void xmpp_conn_set_sm_ack_callback(xmpp_conn_t *conn,
xmpp_sm_ack_callback cb,
void *ctx);
void xmpp_conn_set_sm_fail_callback(xmpp_conn_t *conn,
xmpp_sm_ack_callback cb,
void *ctx);

void xmpp_free_sm_state(xmpp_sm_state_t *sm_state);

int xmpp_connect_client(xmpp_conn_t *conn,
Expand Down
Loading