Web sockets and MQTT is not working at the same time

  1. My goal is: To receive the data from esp32 over web sockets and MQTT at the same time.

  2. My actions are: My code is written to get the data from ESP32 over web sockets and MQTT at the same time.

  3. The result I see is: I am successfully broadcasting the data from esp32 over web sockets(I can visualize it on the web page), but I cannot receive the MQTT at the same time.

When the data is broadcasting over web sockets, MQTT is not publishing the data.

  1. My expectation & question is:

Expectation: I should receive the data from esp32 over Web Sockets and MQTT at the same time.

Question: Why the MQTT Publisher is not publishing at the same time when web sockets are broadcasting the data.

Code:

#include "mgos.h"

#include "mgos_http_server.h"

#include "mgos_system.h"

#include "mgos_timers.h"

#include "mgos_mqtt.h"

#include <string.h>

//  Clock update interval.

#define INTERVAL_SECONDS 1

//  POSIX time related data structures.

time_t maintime;

uint32_t epochTime;

//  JSON message which is sent to web page using mg_printf_websocket_frame().

char *timeString = "{\"messageType\":\"serverTime\",\"serverTime\":%zu}";

//  Using defaults for http options in http response.

struct mg_serve_http_opts http_opts ={ .document_root = "." };

struct mg_connection *nc; //  Using defaults for http options in http response.

static void epoch_handler(struct mg_connection *nc, int ev, void *ev_data, void *user_data);

static void epoch_time_over_ws();

static void broadcast(struct mg_connection *nc);

// Timer and Mqtt Publisher

static void my_timer_cb(void *arg);

enum mgos_app_init_result mgos_app_init(void)

{

    struct mg_connection *nc;

    void *user_data = "";

    //  Get the server handle.

    if ((nc = mgos_get_sys_http_server()) == NULL)

    {

        puts("The value of nc is NULL");

    }

    //  Bind the event handler to the HTTP server.

    // mgos_register_http_endpoint("/", ev_handler, user_data);

    epoch_time_over_ws();

    // Registering the timer 

    mgos_set_timer(1000, MGOS_TIMER_REPEAT, my_timer_cb, NULL);

    return MGOS_APP_INIT_SUCCESS;

}

static void epoch_handler(struct mg_connection *nc, int ev, void *ev_data, void *user_data)

{

    struct http_message *hm = (struct http_message *)ev_data;

    switch (ev)

    {

    case MG_EV_HTTP_REQUEST:

    {

        mg_serve_http(nc, hm, http_opts);

        break;

    }

    case MG_EV_SEND:

    {

        // puts("MG_EV_SEND event fired!");

        break;

    }

    case MG_EV_WEBSOCKET_HANDSHAKE_DONE:

    {

        //  This sets the periodic time updating in motion.

        // mg_set_timer(nc, mg_time() + 1);

        break;

    }

    case MG_EV_WEBSOCKET_FRAME:

    {

        // printf("Websocket message received:\n");

        break;

    }

    case MG_EV_CLOSE:

    {

        break;

    }

    //  The timer event causes a JSON string to be sent to the

    //  webpage via Websocket.

    case MG_EV_TIMER:

    {

        // Intilizing the UNIX Time Stamp

        // maintime = time(0);

        // epochTime = (uint32_t)time(&maintime);

        // mg_printf_websocket_frame(nc, WEBSOCKET_OP_TEXT, timeString, epochTime, inCount, outCount, PeopleCount);

        //  A new timer is set.

        // mg_set_timer(nc, mg_time() + 1);

        break;

    }

    }

}

static void epoch_time_over_ws()

{

    void *user_data = "";

    //  Get the server handle.

    if ((nc = mgos_get_sys_http_server()) == NULL)

    {

        puts("The value of nc is NULL");

    }

    //  Bind the event handler to the HTTP server.

    mgos_register_http_endpoint("/", epoch_handler, user_data);

}

static void broadcast(struct mg_connection *nc)

{

    struct mg_connection *c;

    char addr[32];

    mg_sock_addr_to_str(&nc->sa, addr, sizeof(addr),

        MG_SOCK_STRINGIFY_IP | MG_SOCK_STRINGIFY_PORT);

    for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c))

    {

        if (c == nc)

        {

            continue;

            /* Don't send to the sender. */

            // mg_send_websocket_frame(c, WEBSOCKET_OP_TEXT, buf, strlen(buf));

        }

        else

        {

            maintime = time(0);

            epochTime = (uint32_t)time(&maintime);

            mg_printf_websocket_frame(c, WEBSOCKET_OP_TEXT, timeString, epochTime);

        }

    }

}

static void my_timer_cb(void *arg) {

    const char *publisherTopic = "/dwpcPub";

    const char *message = "{project: DWPC}";

    uint8_t messageLength = strlen(message);

    // broadcast(nc);

    mgos_mqtt_pub(publisherTopic, message, messageLength, 0, 0);

    printf("Mqtt publisher topic: %s and Mqtt Data: %s\n", publisherTopic, message);

    (void)arg;

}

mos.yml FIle:

platform: esp32

name: mjs-latest

author: mongoose-os

description: An empty app that does nothing

version: 1.0

libs_version: ${mos.version}

modules_version: ${mos.version}

mongoose_os_version: ${mos.version}

# Optional. List of tags for online search.

tags:

  - c

# List of files / directories with C sources. No slashes at the end of dir names.

sources:

  - src

# List of dirs. Files from these dirs will be copied to the device filesystem

filesystem:

  - fs

# Custom configuration entries, settable via "device configuration"

# Below is a custom firmware configuration example.

# Uncomment and modify according to your needs:

config_schema:

  # Config Schema for Wifi STA Setup

  - ["wifi.sta.enable", "b", true, { title: "Enable Station mode" }]

  - ["wifi.sta.ssid", "s", "ABCDEFGH", { title: "Wifi network name" }]

  - ["wifi.sta.pass", "s", "12345678", { title: "Password" }]

  - ["wifi.ap.enable", "b", false, { title: "Enable AP mode" }]

  # Config schema for NTP Server Parameters

  - ["sntp", "o", { title: "SNTP settings" }]

  - ["sntp.enable", "b", true, { title: "Enable SNTP" }]

  - ["sntp.server", "s", "time.google.com", { title: "Server address" }]

  - ["sntp.retry_min", "i", 1, { title: "Minimum retry interval" }]

  - ["sntp.retry_max", "i", 30, { title: "Maximum retry interval" }]

  - [

      "sntp.update_interval",

      "i",

      7200,

      { title: "Update interval. If 0, performs a one-off sync" },

    ]

  # Config schema for MQTT parameters

  - ["mqtt", "o", { title: "MQTT settings" }]

  - ["mqtt.enable", "b", true, { title: "Enable MQTT" }]

  - ["mqtt.server", "s", "192.168.0.91", { title: "MQTT server" }]

  - ["mqtt.keep_alive", "i", 60, { title: "Keep alive interval" }]

  - ["mqtt.clean_session", "b", true, { title: "Clean Session" }]

  - [

      "mqtt.max_qos",

      "i",

      0,

      { title: "Limit QoS of outgoing messages to at most this" },

    ]

# List of libraries used by this app, in order of initialisation

libs:

  - origin: https://github.com/mongoose-os-libs/ca-bundle

  - origin: https://github.com/mongoose-os-libs/rpc-service-config

  - origin: https://github.com/mongoose-os-libs/rpc-service-fs

  - origin: https://github.com/mongoose-os-libs/rpc-uart

  - origin: https://github.com/mongoose-os-libs/wifi

  - origin: https://github.com/mongoose-os-libs/http-server

  - origin: https://github.com/mongoose-os-libs/sntp

  - origin: https://github.com/mongoose-os-libs/mqtt

# Used by the mos tool to catch mos binaries incompatible with this file format

manifest_version: 2017-05-18

If I understand you correctly, perhaps a possible answer is that a websocket call is “a TCP send” call while an MQTT publish call in Mongoose-OS is a “queue this and send it” call, as this suggests.
Neither is realtime, though, as TCP is not.

Modify your broadcast function like this and call it in my_timer_cb.

static void broadcast(void) {
  struct mg_mgr *mgr = mgos_get_mgr();

  for (struct mg_connection *c = mg_next(mgr, NULL); c != NULL;
       c = mg_next(mgr, c)) {
    if (c->flags & MG_F_IS_WEBSOCKET) {
      time_t now = time(0);
      mg_printf_websocket_frame(c, WEBSOCKET_OP_TEXT, timeString, now);
      char addr[32];
      mg_sock_addr_to_str(&c->sa, addr, sizeof(addr),
                          MG_SOCK_STRINGIFY_IP | MG_SOCK_STRINGIFY_PORT);
      LOG(LL_INFO, ("c: %p, addr: %s", c, addr));
    }
  }
}

LE. Don’t redefine the config schema for the libraries you are using. Just set the values your application needs. E.g.

 - ["wifi.sta.enable", true]
 - ["wifi.sta.ssid", "ABCDEFGH"]
 - ["wifi.sta.pass", "12345678"]
 - ["wifi.ap.enable", false]

@nliviu Thank you for the quick replay, Now I achieved processing the data through Web sockets and MQTT at the same time.