Mongoose websocket to "stream" images

I am trying to use mongoose 6.16 websocket api’s to “stream” multiple individual images to a web page to animate them in a Canvas html control. I open the websocket (in AngularJS javascript code) and send a request json message that contains the number of images to send back.

I used the sample “chat” code to try to accomplish this task and am using the mg_send_websocket_frame function to send the individual image back. From what I can tell, mongoose buffers all of the images and sends them back all at once (after some triggering event). My web page has a large wait time until all of the images are displayed. The “animation” works correctly once all of the images are sent. I need mongoose to send the individual images immediately.

My question is:

Are there other mongoose APIs that I should use to send the individual images immediately?

Here’s a snippet of the code I’m running:

#include "mongoose.h"

using namespace std;

static const char *s_http_port = "8000";
static struct mg_serve_http_opts s_http_server_opts;

static void signal_handler(int sig_num);
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data);
static int is_websocket(const struct mg_connection *nc);
static void logit(struct mg_connection *nc, const struct mg_str msg);
static void reply(struct mg_mgr *mg, websocket_message *wm);
static void handle_it(struct mg_connection *c, websocket_message *wm);

static void HandleAuthenticate(struct mg_connection *nc, struct http_message *hm);
static void HandleTrainHistory(struct mg_connection *nc, struct http_message *hm);
static void HandleConfig(struct mg_connection *nc, struct http_message *hm);
static void HandleTakePicture(struct mg_connection *nc, struct http_message *hm);
static void HandleTakeBurst(struct mg_connection *nc, struct http_message *hm);
static void HandleViewTcon(struct mg_connection *nc, struct http_message *hm);
static void HandleTdata(struct mg_connection *nc, struct http_message *hm);
static void HandleTtd(struct mg_connection *nc, struct http_message *hm);
static void HandleStreaming(struct mg_connection *c, websocket_message *wm);

/// <summary>
/// Entry point
/// </summary>
/// <param name="argc"></param>
/// <param name="argv"></param>
/// <returns></returns>
int main(int argc, char *argv[]) 
{
    struct mg_mgr mgr;
    struct mg_connection *nc;
    struct mg_bind_opts bind_opts;
    int i;
    char *cp;
    const char *err_str;

    signal(SIGTERM, signal_handler);
    signal(SIGINT, signal_handler);

#if MG_ENABLE_SSL
    const char *ssl_cert = NULL;
#endif

    mg_mgr_init(&mgr, NULL);

    /* Use current binary directory as document root */
    if (argc > 0 && ((cp = strrchr(argv[0], DIRSEP)) != NULL)) {
        *cp = '\0';
        s_http_server_opts.document_root = argv[0];
    }

    /* Process command line options to customize HTTP server */
    for (i = 1; i < argc; i++) 
    {
        if (strcmp(argv[i], "-D") == 0 && i + 1 < argc) 
        {
            mgr.hexdump_file = argv[++i];
        }
        else if (strcmp(argv[i], "-d") == 0 && i + 1 < argc) 
        {
            s_http_server_opts.document_root = argv[++i];
        }
        else if (strcmp(argv[i], "-p") == 0 && i + 1 < argc) 
        {
            s_http_port = argv[++i];
        }
        else if (strcmp(argv[i], "-a") == 0 && i + 1 < argc) 
        {
            s_http_server_opts.auth_domain = argv[++i];
        }
        else if (strcmp(argv[i], "-P") == 0 && i + 1 < argc) 
        {
            s_http_server_opts.global_auth_file = argv[++i];
        }
        else if (strcmp(argv[i], "-A") == 0 && i + 1 < argc) 
        {
            s_http_server_opts.per_directory_auth_file = argv[++i];
        }
        else if (strcmp(argv[i], "-r") == 0 && i + 1 < argc) 
        {
            s_http_server_opts.url_rewrites = argv[++i];
#if MG_ENABLE_HTTP_CGI
        }
        else if (strcmp(argv[i], "-i") == 0 && i + 1 < argc) 
        {
            s_http_server_opts.cgi_interpreter = argv[++i];
#endif
#if MG_ENABLE_SSL
        }
        else if (strcmp(argv[i], "-s") == 0 && i + 1 < argc) 
        {
            ssl_cert = argv[++i];
#endif
        }
        else 
        {
            fprintf(stderr, "Unknown option: [%s]\n", argv[i]);
            exit(1);
        }
    }

    /* Set HTTP server options */
    memset(&bind_opts, 0, sizeof(bind_opts));
    bind_opts.error_string = &err_str;
#if MG_ENABLE_SSL
    if (ssl_cert != NULL) 
    {
        bind_opts.ssl_cert = ssl_cert;
    }
#endif
    nc = mg_bind_opt(&mgr, s_http_port, ev_handler, bind_opts);

    if (nc == NULL) 
    {
        fprintf(stderr, "Error starting server on port %s: %s\n", s_http_port,
            *bind_opts.error_string);
        exit(1);
    }

    mg_set_protocol_http_websocket(nc);
    s_http_server_opts.enable_directory_listing = "yes";

    printf("Starting RESTful server on port %s, serving %s\n", 
            s_http_port,
            s_http_server_opts.document_root);
    for (;;) 
    {
        mg_mgr_poll(&mgr, 1000);
    }
    mg_mgr_free(&mgr);

    return 0;
}

static void signal_handler(int sig_num) 
{
    signal(sig_num, signal_handler);  // Reinstantiate signal handler
//    s_signal_received = sig_num;
    LogError("signal_handler", "", sig_num, "Signal caught");
}

static int is_websocket(const struct mg_connection *nc) 
{
    return nc->flags & MG_F_IS_WEBSOCKET;
}

static void logit(struct mg_connection *nc, const struct mg_str msg) 
{
    char buf[1024] = {0};
    char addr[32] = {0};
    std::string now = TIS_Util::Now();

    mg_sock_addr_to_str(&nc->sa, addr, sizeof(addr),
        MG_SOCK_STRINGIFY_IP | MG_SOCK_STRINGIFY_PORT);

    snprintf(buf, sizeof(buf), "%s - %s %.*s", now.c_str(), addr, (int)msg.len, msg.p);
    printf("%s\n", buf); /* Local echo. */
}

/// <summary>
/// Replies to the websocket request
/// </summary>
/// <param name="mg"></param>
/// <param name="wm"></param>
static void reply(struct mg_mgr *mg, websocket_message *wm)
{
    struct mg_connection *nc;
    nc = mg->active_connections;
    struct mg_connection *c;

    for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c)) 
    {
        if (is_websocket(c)) 
        {
            handle_it(c, wm);
        }
    }
}

/// <summary>
/// Handles the websocket request
/// </summary>
/// <param name="c"></param>
/// <param name="wm"></param>
static void handle_it(struct mg_connection *c, websocket_message *wm)
{
    std::string msg((char *)wm->data, wm->size);
    std::map<std::string, std::string> map;

    TIS_Util::JsonToMap(msg, &map);
    std::string CallbackId = TIS_Util::GetValue("CallbackId", &map);
    std::string Action = TIS_Util::GetValue("Action", &map);
    int action = TIS_Util::ToInt(Action);

    switch (action)
    {
        case TakeThermalPictureBurstAsJson:
            HandleStreaming(c, wm);
            break;
    }

}

/// <summary>
/// Event handler
/// </summary>
/// <param name="nc"></param>
/// <param name="ev"></param>
/// <param name="ev_data"></param>
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) 
{
    struct http_message *hm = (struct http_message *) ev_data;

    switch (ev) 
    {
        case MG_EV_CONNECT: 
        {
            int status = *((int *)ev_data);
            if (status != 0) {
                printf("-- Connection error: %d\n", status);
            }
            break;
        }
        case MG_EV_WEBSOCKET_HANDSHAKE_DONE: {
            struct http_message *hm = (struct http_message *) ev_data;

            if (hm->resp_code == 0) 
            {
                printf("-- Connected\n");
                /* New websocket connection. Tell everybody. */
                logit(nc, mg_mk_str("++ joined"));
            }
            else 
            {
                printf("-- Connection failed! HTTP code %d\n", hm->resp_code);
                /* Connection will be closed after this. */
            }
            break;
        }
        case MG_EV_WEBSOCKET_FRAME: 
        {
            struct websocket_message *wm = (struct websocket_message *) ev_data;
            /* New websocket message. Tell everybody. */
            struct mg_str d = { (char *)wm->data, wm->size };
            logit(nc, d);
            reply(nc->mgr, wm);
            break;
        }
        case MG_EV_CLOSE: 
        {
            /* Disconnect. Tell everybody. */
            if (is_websocket(nc)) 
            {
                logit(nc, mg_mk_str("-- left"));
            }
            break;
        }
        case MG_EV_HTTP_REQUEST:
            if (mg_vcmp(&hm->uri, "/api/User/Authenticate") == 0)
            {
                HandleAuthenticate(nc, hm);
            }
            else if (mg_vcmp(&hm->uri, "/api/Site/TrainHistory") == 0)
            {
                HandleTrainHistory(nc, hm);
            }
            else if (mg_vcmp(&hm->uri, "/api/Site/Config") == 0)
            {
                HandleConfig(nc, hm);
            }
            else if (mg_vcmp(&hm->uri, "/api/Site/TakePicture") == 0)
            {
                HandleTakePicture(nc, hm);
            }
            else if (mg_vcmp(&hm->uri, "/api/Site/TakeBurst") == 0)
            {
                HandleTakeBurst(nc, hm);
            }
            else if (mg_vcmp(&hm->uri, "/api/Train/tcon") == 0)
            {
                HandleViewTcon(nc, hm);
            }
            else if (mg_vcmp(&hm->uri, "/api/Train/tdata") == 0)
            {
                HandleTdata(nc, hm);
            }
            else if (mg_vcmp(&hm->uri, "/api/Train/ttd") == 0)
            {
                HandleTtd(nc, hm);
            }
            else 
            {
                mg_serve_http(nc, hm, s_http_server_opts); //Serve content from wwwroot
            }
            break;

        default:
            break;

    }
}

/// <summary>
/// Handles /api/User/Authenticate web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleAuthenticate(struct mg_connection *nc, struct http_message *hm)
{

   //Handling code deleted...
    mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
    mg_printf_http_chunk(nc, json.c_str());
    mg_send_http_chunk(nc, "", 0);
}

/// <summary>
/// Train History web service
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTrainHistory(struct mg_connection *nc, struct http_message *hm)
{
   //Handling code deleted...

    /* Send headers */
    mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");

    /* Compute the result and send it back as a JSON object */
    mg_printf_http_chunk(nc, json.c_str());
    mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
}

/// <summary>
/// Configuration web service
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleConfig(struct mg_connection *nc, struct http_message *hm)
{
    if (mg_vcmp(&hm->method, "GET") == 0)
    {
   //Handling code deleted...

        /* Send headers */
        mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");

        /* Compute the result and send it back as a JSON object */
        //result = strtod(n1, NULL) + strtod(n2, NULL);
        mg_printf_http_chunk(nc, json.c_str());
        mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
    }
    else if (mg_vcmp(&hm->method, "POST") == 0)
    {
        std::string json(hm->body.p, hm->body.len);
        std::string response = "Success";

   //Handling code deleted...

        mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
        mg_printf_http_chunk(nc, "{ \"result\": \"%s\" }", response.c_str());
        mg_send_http_chunk(nc, "", 0);
    }
    else
    {
        mg_printf(nc, "%s", "HTTP/1.1 400 Bad Request\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
        mg_printf_http_chunk(nc, "{ \"result\": \"%s\" }", "Error");
        mg_send_http_chunk(nc, "", 0);
    }
}

/// <summary>
/// Handles /api/Site/TakePicture web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTakePicture(struct mg_connection *nc, struct http_message *hm)
{
   //Handling code deleted...

    mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");

    /* Compute the result and send it back as a JSON object */
    mg_printf_http_chunk(nc, json.c_str());
    mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
}

/// <summary>
/// Handles /api/Site/TakeBurst web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTakeBurst(struct mg_connection *nc, struct http_message *hm)
{
   //Handling code deleted...
    mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");

    /* Compute the result and send it back as a JSON object */
    mg_printf_http_chunk(nc, json.c_str());
    mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
}

/// <summary>
/// Train Consists web service
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleViewTcon(struct mg_connection *nc, struct http_message *hm)
{

   //Handling code deleted...

    // Send headers 
    mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/text\r\n\r\n");

    // Send data
    for (auto it : tconLines)
    {
        tcon = it;
        mg_printf_http_chunk(nc, tcon->GetLine());
//        ss << tcon->GetLine();
    }

//    text = ss.str();
//    mg_printf_http_chunk(nc, text.c_str());
    mg_send_http_chunk(nc, "", 0);

    ::DeleteList<TIS_Model::CTConModel>(&tconLines);
}

/// <summary>
/// Handles /api/Train/tdata web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTdata(struct mg_connection *nc, struct http_message *hm)
{
   //Handling code deleted...
        mg_printf(nc, "%s", "HTTP/1.1 400 Bad Request\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
        mg_printf_http_chunk(nc, "{ \"result\": \"%s\" }", "FileName query string missing");
        mg_send_http_chunk(nc, "", 0);

}

/// <summary>
/// Handles /api/Train/ttd web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTtd(struct mg_connection *nc, struct http_message *hm)
{
   //Handling code deleted...
        mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
        mg_printf_http_chunk(nc, json.c_str());
        mg_send_http_chunk(nc, "", 0);
}

/// <summary>
/// Handles streaming images over web socket (for unit testing)
/// </summary>
/// <param name="c"></param>
/// <param name="wm"></param>
static void HandleStreaming(struct mg_connection *c, websocket_message *wm)
{
    std::string msg((char *)wm->data, wm->size);
    std::map<std::string, std::string> map;

    TIS_Util::JsonToMap(msg, &map);
    std::string CallbackId = TIS_Util::GetValue("CallbackId", &map);
    std::string Action = TIS_Util::GetValue("Action", &map);
    std::string ImageCount = TIS_Util::GetValue("ImageCount", &map);
    std::string CameraNo = TIS_Util::GetValue("CameraNo", &map);
    std::string Enhanced = TIS_Util::GetValue("Enhanced", &map);
    std::string json;
    std::ostringstream ss;

    int count = TIS_Util::ToInt(ImageCount);
    int camera = TIS_Util::ToInt(CameraNo);

    for (int i = 0; i < count; i++)
    {
        ss << "{ ";
        ss << "\"CallbackId\" : \"" << CallbackId << "\",";
        ss << "\"Action\" : \"" << Action << "\",";
        ss << "\"Message\" : \"" << "Streaming" << "\",";
        ss << "\"CameraNo\" : \"" << CameraNo << "\",";
        ss << "\"Enhanced\" : \"" << Enhanced << "\",";
        ss << "\"Image\" : ";
        ss << "[";
        for (int n = 0; n < TIS_IMAGE_STRUCT__IMAGE__LENGTH; n++)
        {
            ss << rand() % 256;
            if (n < TIS_IMAGE_STRUCT__IMAGE__LENGTH - 1)
                ss << ",";
        }
        ss << "]";
        ss << "}";

        json = ss.str();
        mg_send_websocket_frame(c, WEBSOCKET_OP_TEXT, json.c_str(), json.length());
        logit(c, mg_mk_str("-- message sent"));
        ss.str("");
    }
}

I encounter the same problem. In addition, from the debug message, I found mg_mgr_poll(MG_EV_SEND event) the send data size is 1460bytes every time, i don’t know why.