I am trying to use mongoose 6.16 websocket api’s to “stream” multiple individual images to a web page to animate them in a Canvas html control. I open the websocket (in AngularJS javascript code) and send a request json message that contains the number of images to send back.
I used the sample “chat” code to try to accomplish this task and am using the mg_send_websocket_frame function to send the individual image back. From what I can tell, mongoose buffers all of the images and sends them back all at once (after some triggering event). My web page has a large wait time until all of the images are displayed. The “animation” works correctly once all of the images are sent. I need mongoose to send the individual images immediately.
My question is:
Are there other mongoose APIs that I should use to send the individual images immediately?
Here’s a snippet of the code I’m running:
#include "mongoose.h"
using namespace std;
static const char *s_http_port = "8000";
static struct mg_serve_http_opts s_http_server_opts;
static void signal_handler(int sig_num);
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data);
static int is_websocket(const struct mg_connection *nc);
static void logit(struct mg_connection *nc, const struct mg_str msg);
static void reply(struct mg_mgr *mg, websocket_message *wm);
static void handle_it(struct mg_connection *c, websocket_message *wm);
static void HandleAuthenticate(struct mg_connection *nc, struct http_message *hm);
static void HandleTrainHistory(struct mg_connection *nc, struct http_message *hm);
static void HandleConfig(struct mg_connection *nc, struct http_message *hm);
static void HandleTakePicture(struct mg_connection *nc, struct http_message *hm);
static void HandleTakeBurst(struct mg_connection *nc, struct http_message *hm);
static void HandleViewTcon(struct mg_connection *nc, struct http_message *hm);
static void HandleTdata(struct mg_connection *nc, struct http_message *hm);
static void HandleTtd(struct mg_connection *nc, struct http_message *hm);
static void HandleStreaming(struct mg_connection *c, websocket_message *wm);
/// <summary>
/// Entry point
/// </summary>
/// <param name="argc"></param>
/// <param name="argv"></param>
/// <returns></returns>
int main(int argc, char *argv[])
{
struct mg_mgr mgr;
struct mg_connection *nc;
struct mg_bind_opts bind_opts;
int i;
char *cp;
const char *err_str;
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
#if MG_ENABLE_SSL
const char *ssl_cert = NULL;
#endif
mg_mgr_init(&mgr, NULL);
/* Use current binary directory as document root */
if (argc > 0 && ((cp = strrchr(argv[0], DIRSEP)) != NULL)) {
*cp = '\0';
s_http_server_opts.document_root = argv[0];
}
/* Process command line options to customize HTTP server */
for (i = 1; i < argc; i++)
{
if (strcmp(argv[i], "-D") == 0 && i + 1 < argc)
{
mgr.hexdump_file = argv[++i];
}
else if (strcmp(argv[i], "-d") == 0 && i + 1 < argc)
{
s_http_server_opts.document_root = argv[++i];
}
else if (strcmp(argv[i], "-p") == 0 && i + 1 < argc)
{
s_http_port = argv[++i];
}
else if (strcmp(argv[i], "-a") == 0 && i + 1 < argc)
{
s_http_server_opts.auth_domain = argv[++i];
}
else if (strcmp(argv[i], "-P") == 0 && i + 1 < argc)
{
s_http_server_opts.global_auth_file = argv[++i];
}
else if (strcmp(argv[i], "-A") == 0 && i + 1 < argc)
{
s_http_server_opts.per_directory_auth_file = argv[++i];
}
else if (strcmp(argv[i], "-r") == 0 && i + 1 < argc)
{
s_http_server_opts.url_rewrites = argv[++i];
#if MG_ENABLE_HTTP_CGI
}
else if (strcmp(argv[i], "-i") == 0 && i + 1 < argc)
{
s_http_server_opts.cgi_interpreter = argv[++i];
#endif
#if MG_ENABLE_SSL
}
else if (strcmp(argv[i], "-s") == 0 && i + 1 < argc)
{
ssl_cert = argv[++i];
#endif
}
else
{
fprintf(stderr, "Unknown option: [%s]\n", argv[i]);
exit(1);
}
}
/* Set HTTP server options */
memset(&bind_opts, 0, sizeof(bind_opts));
bind_opts.error_string = &err_str;
#if MG_ENABLE_SSL
if (ssl_cert != NULL)
{
bind_opts.ssl_cert = ssl_cert;
}
#endif
nc = mg_bind_opt(&mgr, s_http_port, ev_handler, bind_opts);
if (nc == NULL)
{
fprintf(stderr, "Error starting server on port %s: %s\n", s_http_port,
*bind_opts.error_string);
exit(1);
}
mg_set_protocol_http_websocket(nc);
s_http_server_opts.enable_directory_listing = "yes";
printf("Starting RESTful server on port %s, serving %s\n",
s_http_port,
s_http_server_opts.document_root);
for (;;)
{
mg_mgr_poll(&mgr, 1000);
}
mg_mgr_free(&mgr);
return 0;
}
static void signal_handler(int sig_num)
{
signal(sig_num, signal_handler); // Reinstantiate signal handler
// s_signal_received = sig_num;
LogError("signal_handler", "", sig_num, "Signal caught");
}
static int is_websocket(const struct mg_connection *nc)
{
return nc->flags & MG_F_IS_WEBSOCKET;
}
static void logit(struct mg_connection *nc, const struct mg_str msg)
{
char buf[1024] = {0};
char addr[32] = {0};
std::string now = TIS_Util::Now();
mg_sock_addr_to_str(&nc->sa, addr, sizeof(addr),
MG_SOCK_STRINGIFY_IP | MG_SOCK_STRINGIFY_PORT);
snprintf(buf, sizeof(buf), "%s - %s %.*s", now.c_str(), addr, (int)msg.len, msg.p);
printf("%s\n", buf); /* Local echo. */
}
/// <summary>
/// Replies to the websocket request
/// </summary>
/// <param name="mg"></param>
/// <param name="wm"></param>
static void reply(struct mg_mgr *mg, websocket_message *wm)
{
struct mg_connection *nc;
nc = mg->active_connections;
struct mg_connection *c;
for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c))
{
if (is_websocket(c))
{
handle_it(c, wm);
}
}
}
/// <summary>
/// Handles the websocket request
/// </summary>
/// <param name="c"></param>
/// <param name="wm"></param>
static void handle_it(struct mg_connection *c, websocket_message *wm)
{
std::string msg((char *)wm->data, wm->size);
std::map<std::string, std::string> map;
TIS_Util::JsonToMap(msg, &map);
std::string CallbackId = TIS_Util::GetValue("CallbackId", &map);
std::string Action = TIS_Util::GetValue("Action", &map);
int action = TIS_Util::ToInt(Action);
switch (action)
{
case TakeThermalPictureBurstAsJson:
HandleStreaming(c, wm);
break;
}
}
/// <summary>
/// Event handler
/// </summary>
/// <param name="nc"></param>
/// <param name="ev"></param>
/// <param name="ev_data"></param>
static void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
{
struct http_message *hm = (struct http_message *) ev_data;
switch (ev)
{
case MG_EV_CONNECT:
{
int status = *((int *)ev_data);
if (status != 0) {
printf("-- Connection error: %d\n", status);
}
break;
}
case MG_EV_WEBSOCKET_HANDSHAKE_DONE: {
struct http_message *hm = (struct http_message *) ev_data;
if (hm->resp_code == 0)
{
printf("-- Connected\n");
/* New websocket connection. Tell everybody. */
logit(nc, mg_mk_str("++ joined"));
}
else
{
printf("-- Connection failed! HTTP code %d\n", hm->resp_code);
/* Connection will be closed after this. */
}
break;
}
case MG_EV_WEBSOCKET_FRAME:
{
struct websocket_message *wm = (struct websocket_message *) ev_data;
/* New websocket message. Tell everybody. */
struct mg_str d = { (char *)wm->data, wm->size };
logit(nc, d);
reply(nc->mgr, wm);
break;
}
case MG_EV_CLOSE:
{
/* Disconnect. Tell everybody. */
if (is_websocket(nc))
{
logit(nc, mg_mk_str("-- left"));
}
break;
}
case MG_EV_HTTP_REQUEST:
if (mg_vcmp(&hm->uri, "/api/User/Authenticate") == 0)
{
HandleAuthenticate(nc, hm);
}
else if (mg_vcmp(&hm->uri, "/api/Site/TrainHistory") == 0)
{
HandleTrainHistory(nc, hm);
}
else if (mg_vcmp(&hm->uri, "/api/Site/Config") == 0)
{
HandleConfig(nc, hm);
}
else if (mg_vcmp(&hm->uri, "/api/Site/TakePicture") == 0)
{
HandleTakePicture(nc, hm);
}
else if (mg_vcmp(&hm->uri, "/api/Site/TakeBurst") == 0)
{
HandleTakeBurst(nc, hm);
}
else if (mg_vcmp(&hm->uri, "/api/Train/tcon") == 0)
{
HandleViewTcon(nc, hm);
}
else if (mg_vcmp(&hm->uri, "/api/Train/tdata") == 0)
{
HandleTdata(nc, hm);
}
else if (mg_vcmp(&hm->uri, "/api/Train/ttd") == 0)
{
HandleTtd(nc, hm);
}
else
{
mg_serve_http(nc, hm, s_http_server_opts); //Serve content from wwwroot
}
break;
default:
break;
}
}
/// <summary>
/// Handles /api/User/Authenticate web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleAuthenticate(struct mg_connection *nc, struct http_message *hm)
{
//Handling code deleted...
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
mg_printf_http_chunk(nc, json.c_str());
mg_send_http_chunk(nc, "", 0);
}
/// <summary>
/// Train History web service
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTrainHistory(struct mg_connection *nc, struct http_message *hm)
{
//Handling code deleted...
/* Send headers */
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
/* Compute the result and send it back as a JSON object */
mg_printf_http_chunk(nc, json.c_str());
mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
}
/// <summary>
/// Configuration web service
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleConfig(struct mg_connection *nc, struct http_message *hm)
{
if (mg_vcmp(&hm->method, "GET") == 0)
{
//Handling code deleted...
/* Send headers */
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
/* Compute the result and send it back as a JSON object */
//result = strtod(n1, NULL) + strtod(n2, NULL);
mg_printf_http_chunk(nc, json.c_str());
mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
}
else if (mg_vcmp(&hm->method, "POST") == 0)
{
std::string json(hm->body.p, hm->body.len);
std::string response = "Success";
//Handling code deleted...
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
mg_printf_http_chunk(nc, "{ \"result\": \"%s\" }", response.c_str());
mg_send_http_chunk(nc, "", 0);
}
else
{
mg_printf(nc, "%s", "HTTP/1.1 400 Bad Request\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
mg_printf_http_chunk(nc, "{ \"result\": \"%s\" }", "Error");
mg_send_http_chunk(nc, "", 0);
}
}
/// <summary>
/// Handles /api/Site/TakePicture web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTakePicture(struct mg_connection *nc, struct http_message *hm)
{
//Handling code deleted...
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
/* Compute the result and send it back as a JSON object */
mg_printf_http_chunk(nc, json.c_str());
mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
}
/// <summary>
/// Handles /api/Site/TakeBurst web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTakeBurst(struct mg_connection *nc, struct http_message *hm)
{
//Handling code deleted...
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
/* Compute the result and send it back as a JSON object */
mg_printf_http_chunk(nc, json.c_str());
mg_send_http_chunk(nc, "", 0); /* Send empty chunk, the end of response */
}
/// <summary>
/// Train Consists web service
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleViewTcon(struct mg_connection *nc, struct http_message *hm)
{
//Handling code deleted...
// Send headers
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/text\r\n\r\n");
// Send data
for (auto it : tconLines)
{
tcon = it;
mg_printf_http_chunk(nc, tcon->GetLine());
// ss << tcon->GetLine();
}
// text = ss.str();
// mg_printf_http_chunk(nc, text.c_str());
mg_send_http_chunk(nc, "", 0);
::DeleteList<TIS_Model::CTConModel>(&tconLines);
}
/// <summary>
/// Handles /api/Train/tdata web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTdata(struct mg_connection *nc, struct http_message *hm)
{
//Handling code deleted...
mg_printf(nc, "%s", "HTTP/1.1 400 Bad Request\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
mg_printf_http_chunk(nc, "{ \"result\": \"%s\" }", "FileName query string missing");
mg_send_http_chunk(nc, "", 0);
}
/// <summary>
/// Handles /api/Train/ttd web service call
/// </summary>
/// <param name="nc"></param>
/// <param name="hm"></param>
static void HandleTtd(struct mg_connection *nc, struct http_message *hm)
{
//Handling code deleted...
mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nAccess-Control-Allow-Origin: *\r\nContent-Type: application/json\r\n\r\n");
mg_printf_http_chunk(nc, json.c_str());
mg_send_http_chunk(nc, "", 0);
}
/// <summary>
/// Handles streaming images over web socket (for unit testing)
/// </summary>
/// <param name="c"></param>
/// <param name="wm"></param>
static void HandleStreaming(struct mg_connection *c, websocket_message *wm)
{
std::string msg((char *)wm->data, wm->size);
std::map<std::string, std::string> map;
TIS_Util::JsonToMap(msg, &map);
std::string CallbackId = TIS_Util::GetValue("CallbackId", &map);
std::string Action = TIS_Util::GetValue("Action", &map);
std::string ImageCount = TIS_Util::GetValue("ImageCount", &map);
std::string CameraNo = TIS_Util::GetValue("CameraNo", &map);
std::string Enhanced = TIS_Util::GetValue("Enhanced", &map);
std::string json;
std::ostringstream ss;
int count = TIS_Util::ToInt(ImageCount);
int camera = TIS_Util::ToInt(CameraNo);
for (int i = 0; i < count; i++)
{
ss << "{ ";
ss << "\"CallbackId\" : \"" << CallbackId << "\",";
ss << "\"Action\" : \"" << Action << "\",";
ss << "\"Message\" : \"" << "Streaming" << "\",";
ss << "\"CameraNo\" : \"" << CameraNo << "\",";
ss << "\"Enhanced\" : \"" << Enhanced << "\",";
ss << "\"Image\" : ";
ss << "[";
for (int n = 0; n < TIS_IMAGE_STRUCT__IMAGE__LENGTH; n++)
{
ss << rand() % 256;
if (n < TIS_IMAGE_STRUCT__IMAGE__LENGTH - 1)
ss << ",";
}
ss << "]";
ss << "}";
json = ss.str();
mg_send_websocket_frame(c, WEBSOCKET_OP_TEXT, json.c_str(), json.length());
logit(c, mg_mk_str("-- message sent"));
ss.str("");
}
}