+ while (req->cleanup != NULL) {
+ struct cleanup_list *c = req->cleanup;
+ req->cleanup = c->next;
+ c->func(c->arg);
+ g_free(c);
+ }
+
+ if (req->connection->udp_transport) {
+ /* For UDP, a connection only exists for the duration of a single
+ * message. */
+ g_mutex_free(req->connection->send_lock);
+ g_string_free(req->connection->sendbuf, TRUE);
+ g_free(req->connection);
+ }
+
+ g_free(req);
+}
+
+void
+async_rpc_send_reply(RPCRequest *req, void *result)
+{
+ bluesky_time_hires time_end;
+
+ GString *str = g_string_new("");
+ XDR xdr_out;
+ xdr_string_create(&xdr_out, str, XDR_ENCODE);
+ if (!req->xdr_result(&xdr_out, result)) {
+ async_rpc_send_failure(req, SYSTEM_ERR);
+ g_string_free(str, TRUE);
+ return;
+ }
+
+ struct rpc_reply header;
+ header.xid = htonl(req->xid);
+ header.type = htonl(1); /* REPLY */
+ header.stat = htonl(MSG_ACCEPTED);
+ header.verf_flavor = 0;
+ header.verf_len = 0;
+ header.accept_stat = 0;
+
+ g_mutex_lock(req->connection->send_lock);
+ gsize msg_size = str->len;
+ uint32_t fragment = htonl((msg_size + sizeof(header)) | 0x80000000);
+ if (!req->connection->udp_transport)
+ async_rpc_write(req->connection, (const char *)&fragment,
+ sizeof(fragment));
+ async_rpc_write(req->connection, (const char *)&header, sizeof(header));
+ async_rpc_write(req->connection, str->str, str->len);
+ async_rpc_flush(req->connection);
+ g_mutex_unlock(req->connection->send_lock);
+
+ time_end = bluesky_now_hires();
+
+ printf("RPC[%"PRIx32"]: time = %"PRId64" ns\n",
+ req->xid, time_end - req->time_start);
+
+ /* Clean up. */
+ g_string_free(str, TRUE);
+