Topics

mqtt_connect blocked at connect stage

Linux Weekend Learning
 

Hi Guys,

Please help me out here, the code is blocked internally when calling 'connect'

I wrote a code to connect to the Amazon Web Services IOT Service

/*
 * Copyright (c) 2019 Intel Corporation
 *
 * SPDX-License-Identifier: Apache-2.0
 */

#include <logging/log.h>
LOG_MODULE_REGISTER(aws_mqtt, LOG_LEVEL_DBG);

#include <zephyr.h>
#include <net/socket.h>
#include <net/mqtt.h>
#include <net/net_config.h>
#include <net/net_event.h>
#include <net/sntp.h>

#include <sys/printk.h>
#include <string.h>
#include <errno.h>
#include "dhcp.h"
#include <time.h>
#include <inttypes.h>
#include "test_certs.h"
#if defined(CONFIG_SOCKS)
static struct sockaddr socks5_proxy;
#endif
static struct pollfd fds[1];
static int nfds;

#define CONFIG_SAMPLE_CLOUD_AWS_PASSWORD ""
#define CONFIG_SAMPLE_CLOUD_AWS_USERNAME ""

#define APP_MQTT_BUFFER_SIZE    128
#define APP_CA_CERT_TAG 1
#define APP_PSK_TAG 2

static sec_tag_t m_sec_tags[] = {
#if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
APP_CA_CERT_TAG,
#endif
#if defined(MBEDTLS_KEY_EXCHANGE__SOME__PSK_ENABLED)
APP_PSK_TAG,
#endif
};

/* Buffers for MQTT client. */
static u8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
static u8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
static bool connected;

#define MQTT_CLIENTID           "zephyr_publisher"

struct zsock_addrinfo *haddr;
s64_t time_base;
static struct sockaddr_storage broker;
static struct mqtt_client client_ctx;

static void clear_fds(void)
{
nfds = 0;
}
time_t my_k_time(time_t *ptr)
{
s64_t stamp;
time_t now;

stamp = k_uptime_get();
now = (time_t)((stamp + time_base) / 1000);

if (ptr) {
*ptr = now;
}

return now;
}

static void broker_init(void)
{
struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;

broker4->sin_family = AF_INET;
broker4->sin_port = htons(8883);

net_ipaddr_copy(&broker4->sin_addr,
&net_sin(haddr->ai_addr)->sin_addr);

#if defined(CONFIG_SOCKS)
struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy;

proxy4->sin_family = AF_INET;
proxy4->sin_port = htons(1081);
inet_pton(AF_INET, "agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", &proxy4->sin_addr);
#endif
}

void mqtt_evt_handler(struct mqtt_client *const client,
const struct mqtt_evt *evt)
{
int err;

switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
LOG_ERR("MQTT connect failed %d", evt->result);
break;
}

connected = true;
LOG_INF("MQTT client connected!");

break;

case MQTT_EVT_DISCONNECT:
LOG_INF("MQTT client disconnected %d", evt->result);

connected = false;
clear_fds();

break;

case MQTT_EVT_PUBACK:
if (evt->result != 0) {
LOG_ERR("MQTT PUBACK error %d", evt->result);
break;
}

LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);

break;

case MQTT_EVT_PUBREC:
if (evt->result != 0) {
LOG_ERR("MQTT PUBREC error %d", evt->result);
break;
}

LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);

const struct mqtt_pubrel_param rel_param = {
.message_id = evt->param.pubrec.message_id
};
err = mqtt_publish_qos2_release(client, &rel_param);
if (err != 0) {
LOG_ERR("Failed to send MQTT PUBREL: %d", err);
}

break;

case MQTT_EVT_PUBCOMP:
if (evt->result != 0) {
LOG_ERR("MQTT PUBCOMP error %d", evt->result);
break;
}

LOG_INF("PUBCOMP packet id: %u",
evt->param.pubcomp.message_id);

break;

default:
break;
}
}


static void client_init(struct mqtt_client *client)
{
        static struct mqtt_utf8 password;
        static struct mqtt_utf8 username;

mqtt_client_init(client);
broker_init();


password.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_PASSWORD;
password.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_PASSWORD);

client->password = &password;

username.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_USERNAME;
username.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_USERNAME);

client->user_name = &username;


/* MQTT client configuration */
client->broker = &broker;
client->evt_cb = mqtt_evt_handler;
client->client_id.utf8 = (u8_t *)MQTT_CLIENTID;
client->client_id.size = strlen(MQTT_CLIENTID);
client->password = NULL;
client->user_name = NULL;
client->protocol_version = MQTT_VERSION_3_1_1;

/* MQTT buffers configuration */
client->rx_buf = rx_buffer;
client->rx_buf_size = sizeof(rx_buffer);
client->tx_buf = tx_buffer;
client->tx_buf_size = sizeof(tx_buffer);

client->transport.type = MQTT_TRANSPORT_SECURE;

struct mqtt_sec_config *tls_config = &client->transport.tls.config;

tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
tls_config->cipher_list = NULL;
tls_config->sec_tag_list = m_sec_tags;
tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
tls_config->hostname = "agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com";
client->transport.type = MQTT_TRANSPORT_SECURE;
#if defined(CONFIG_SOCKS)
mqtt_client_set_proxy(client, &socks5_proxy,
socks5_proxy.sa_family == AF_INET ?
sizeof(struct sockaddr_in) :
sizeof(struct sockaddr_in6));
#endif


}
#define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
#define PRINT_RESULT(func, rc) \
LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
static void prepare_fds(struct mqtt_client *client)
{
if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
fds[0].fd = client->transport.tcp.sock;
}
#if defined(CONFIG_MQTT_LIB_TLS)
else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
fds[0].fd = client->transport.tls.sock;
}
#endif

fds[0].events = ZSOCK_POLLIN;
nfds = 1;
}

static void wait(int timeout)
{
if (nfds > 0) {
if (poll(fds, nfds, timeout) < 0) {
LOG_ERR("poll error: %d", errno);
}
}
}



static int try_to_connect(struct mqtt_client *client)
{
u8_t retries = 3U;
int rc;

LOG_DBG("attempting to connect...");

client_init(client);
rc = mqtt_connect(client);
if (rc != 0) {
PRINT_RESULT("mqtt_connect", rc);
k_sleep(500);
} else {

prepare_fds(client);

wait(500);
mqtt_input(client);
if (!connected) {
LOG_INF("Aborting connection\n");
mqtt_abort(client);
}

}
if (connected) {
return 0;
}

return -EINVAL;

}


static int tls_init(void)
{
        int err = -EINVAL;
LOG_INF("Ca size:%d\n", sizeof(ca_certificate));
        err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
                                 ca_certificate, sizeof(ca_certificate));
        if (err < 0) {
                LOG_ERR("Failed to register public certificate: %d", err);
                return err;
        }


        return err;
}


void mqtt_startup(char *hostname, int port)
{
struct mqtt_client *client = &client_ctx;
int retries = 5;
int err, cnt;
static struct zsock_addrinfo hints;
int res = 0;
int rc;




hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
cnt = 0;
while ((err = getaddrinfo("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", "8883", &hints,
&haddr)) && cnt < 3) {
LOG_ERR("Unable to get address for broker, retrying");
cnt++;
}
if (err != 0) {
LOG_ERR("Unable to get address for broker, error %d",
res);
return;
}
LOG_INF("DNS resolved for agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com:8833");
try_to_connect(client);
PRINT_RESULT("try_to_connect", rc);




}

static void show_addrinfo(struct addrinfo *addr)
{
char hr_addr[NET_IPV6_ADDR_LEN];
void *a;

top:
LOG_DBG("  flags   : %d", addr->ai_flags);
LOG_DBG("  family  : %d", addr->ai_family);
LOG_DBG("  socktype: %d", addr->ai_socktype);
LOG_DBG("  protocol: %d", addr->ai_protocol);
LOG_DBG("  addrlen : %d", (int)addr->ai_addrlen);

/* Assume two words. */
LOG_DBG("   addr[0]: 0x%lx", ((uint32_t *)addr->ai_addr)[0]);
LOG_DBG("   addr[1]: 0x%lx", ((uint32_t *)addr->ai_addr)[1]);

if (addr->ai_next != 0) {
addr = addr->ai_next;
goto top;
}

a = &net_sin(addr->ai_addr)->sin_addr;
LOG_INF("  Got %s",
log_strdup(net_addr_ntop(addr->ai_family, a,
hr_addr, sizeof(hr_addr))));

}

void do_sntp(struct addrinfo *addr)
{
struct sntp_ctx ctx;
int rc;
s64_t stamp;
struct sntp_time sntp_time;
char time_str[sizeof("1970-01-01T00:00:00")];

LOG_INF("Sending NTP request for current time:");

/* Initialize sntp */
rc = sntp_init(&ctx, addr->ai_addr, sizeof(struct sockaddr_in));
if (rc < 0) {
LOG_ERR("Unable to init sntp context: %d", rc);
return;
}
rc = sntp_query(&ctx, K_FOREVER, &sntp_time);
if (rc == 0) {
stamp = k_uptime_get();
time_base = sntp_time.seconds * MSEC_PER_SEC - stamp;

/* Convert time to make sure. */
time_t now = sntp_time.seconds;
struct tm now_tm;

gmtime_r(&now, &now_tm);
strftime(time_str, sizeof(time_str), "%FT%T", &now_tm);
LOG_INF("  Acquired time: %s", log_strdup(time_str));

} else {
LOG_ERR("  Failed to acquire SNTP, code %d\n", rc);
}

sntp_close(&ctx);
}


void main(void)
{
static struct addrinfo hints;
struct addrinfo *haddr;
int res;
int cnt = 0;
        int rc;


app_dhcpv4_startup();

LOG_INF("Should have DHCPv4 lease at this point.");
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
//while ((res = getaddrinfo("console.aws.amazon.com", "80", &hints,
while ((res = getaddrinfo("time.google.com", "123", &hints,
&haddr)) && cnt < 3) {
LOG_ERR("Unable to get address for NTP server, retrying");
cnt++;
}

if (res != 0) {
LOG_ERR("Unable to get address of NTP server, exiting %d", res);
return;
}


LOG_INF("DNS resolved for console.aws.amazon.com:80");
show_addrinfo(haddr);
do_sntp(haddr);

#if defined(CONFIG_MQTT_LIB_TLS)

        rc = tls_init();
        PRINT_RESULT("tls_init", rc);
#endif

mqtt_startup("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", 8883);

}

Console Logs:

[00:00:09.666,000] <inf> aws_mqtt: tls_init: 0 <OK>
[00:00:09.706,000] <inf> aws_mqtt: DNS resolved for agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com:8833
[00:00:09.718,000] <dbg> aws_mqtt.try_to_connect: attempting to connect...
[00:00:09.727,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect: (0x20401bac): Created socket 0
[00:00:09.739,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect: (0x20401bac): Before Connect completed

Jukka Rissanen
 

Hi,

some comments inline below:

On Sun, 2020-03-01 at 10:01 +0530, Linux Weekend Learning wrote:

static void broker_init(void)
{
struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;

broker4->sin_family = AF_INET;
broker4->sin_port = htons(8883);

net_ipaddr_copy(&broker4->sin_addr,
&net_sin(haddr->ai_addr)->sin_addr);

#if defined(CONFIG_SOCKS)
struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy;

proxy4->sin_family = AF_INET;
proxy4->sin_port = htons(1081);
inet_pton(AF_INET, "agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com",
&proxy4->sin_addr);
Use IP address here instead of hostname.


#endif
}

void mqtt_evt_handler(struct mqtt_client *const client,
const struct mqtt_evt *evt)
{
int err;

switch (evt->type) {
case MQTT_EVT_CONNACK:
if (evt->result != 0) {
LOG_ERR("MQTT connect failed %d", evt->result);
break;
}

connected = true;
LOG_INF("MQTT client connected!");

break;

case MQTT_EVT_DISCONNECT:
LOG_INF("MQTT client disconnected %d", evt->result);

connected = false;
clear_fds();

break;

case MQTT_EVT_PUBACK:
if (evt->result != 0) {
LOG_ERR("MQTT PUBACK error %d", evt->result);
break;
}

LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);

break;

case MQTT_EVT_PUBREC:
if (evt->result != 0) {
LOG_ERR("MQTT PUBREC error %d", evt->result);
break;
}

LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);

const struct mqtt_pubrel_param rel_param = {
.message_id = evt->param.pubrec.message_id
};
err = mqtt_publish_qos2_release(client, &rel_param);
if (err != 0) {
LOG_ERR("Failed to send MQTT PUBREL: %d", err);
}

break;

case MQTT_EVT_PUBCOMP:
if (evt->result != 0) {
LOG_ERR("MQTT PUBCOMP error %d", evt->result);
break;
}

LOG_INF("PUBCOMP packet id: %u",
evt->param.pubcomp.message_id);

break;

default:
break;
}
}


static void client_init(struct mqtt_client *client)
{
static struct mqtt_utf8 password;
static struct mqtt_utf8 username;

mqtt_client_init(client);
broker_init();


password.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_PASSWORD;
password.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_PASSWORD);

client->password = &password;

username.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_USERNAME;
username.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_USERNAME);

client->user_name = &username;


/* MQTT client configuration */
client->broker = &broker;
client->evt_cb = mqtt_evt_handler;
client->client_id.utf8 = (u8_t *)MQTT_CLIENTID;
client->client_id.size = strlen(MQTT_CLIENTID);
client->password = NULL;
client->user_name = NULL;
client->protocol_version = MQTT_VERSION_3_1_1;

/* MQTT buffers configuration */
client->rx_buf = rx_buffer;
client->rx_buf_size = sizeof(rx_buffer);
client->tx_buf = tx_buffer;
client->tx_buf_size = sizeof(tx_buffer);

client->transport.type = MQTT_TRANSPORT_SECURE;

struct mqtt_sec_config *tls_config = &client->transport.tls.config;

tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
tls_config->cipher_list = NULL;
tls_config->sec_tag_list = m_sec_tags;
tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
tls_config->hostname = "agwgaprlb6xdl-ats.iot.us-west-
2.amazonaws.com";
client->transport.type = MQTT_TRANSPORT_SECURE;
#if defined(CONFIG_SOCKS)
mqtt_client_set_proxy(client, &socks5_proxy,
socks5_proxy.sa_family == AF_INET ?
sizeof(struct sockaddr_in) :
sizeof(struct sockaddr_in6));
#endif


}
#define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
#define PRINT_RESULT(func, rc) \
LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
static void prepare_fds(struct mqtt_client *client)
{
if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
fds[0].fd = client->transport.tcp.sock;
}
#if defined(CONFIG_MQTT_LIB_TLS)
else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
fds[0].fd = client->transport.tls.sock;
}
#endif

fds[0].events = ZSOCK_POLLIN;
nfds = 1;
}

static void wait(int timeout)
{
if (nfds > 0) {
if (poll(fds, nfds, timeout) < 0) {
LOG_ERR("poll error: %d", errno);
}
}
}



static int try_to_connect(struct mqtt_client *client)
{
u8_t retries = 3U;
int rc;

LOG_DBG("attempting to connect...");
You could print the IP address you are trying to connect in order to
verify that it is set correctly.


client_init(client);
rc = mqtt_connect(client);
if (rc != 0) {
PRINT_RESULT("mqtt_connect", rc);
k_sleep(500);
} else {

prepare_fds(client);

wait(500);
mqtt_input(client);
if (!connected) {
LOG_INF("Aborting connection\n");
mqtt_abort(client);
}

}
if (connected) {
return 0;
}

return -EINVAL;

}


static int tls_init(void)
{
int err = -EINVAL;
LOG_INF("Ca size:%d\n", sizeof(ca_certificate));
err = tls_credential_add(APP_CA_CERT_TAG,
TLS_CREDENTIAL_CA_CERTIFICATE,
ca_certificate,
sizeof(ca_certificate));
if (err < 0) {
LOG_ERR("Failed to register public certificate: %d",
err);
return err;
}


return err;
}


void mqtt_startup(char *hostname, int port)
{
struct mqtt_client *client = &client_ctx;
int retries = 5;
int err, cnt;
static struct zsock_addrinfo hints;
int res = 0;
int rc;




hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = 0;
cnt = 0;
while ((err = getaddrinfo("agwgaprlb6xdl-ats.iot.us-west-
2.amazonaws.com", "8883", &hints,
&haddr)) && cnt < 3) {
LOG_ERR("Unable to get address for broker, retrying");
cnt++;
}
if (err != 0) {
LOG_ERR("Unable to get address for broker, error %d",
res);
return;
}
LOG_INF("DNS resolved for agwgaprlb6xdl-ats.iot.us-west-
2.amazonaws.com:8833");
try_to_connect(client);
PRINT_RESULT("try_to_connect", rc);




}

static void show_addrinfo(struct addrinfo *addr)
{
char hr_addr[NET_IPV6_ADDR_LEN];
void *a;

top:
LOG_DBG(" flags : %d", addr->ai_flags);
LOG_DBG(" family : %d", addr->ai_family);
LOG_DBG(" socktype: %d", addr->ai_socktype);
LOG_DBG(" protocol: %d", addr->ai_protocol);
LOG_DBG(" addrlen : %d", (int)addr->ai_addrlen);

/* Assume two words. */
LOG_DBG(" addr[0]: 0x%lx", ((uint32_t *)addr->ai_addr)[0]);
LOG_DBG(" addr[1]: 0x%lx", ((uint32_t *)addr->ai_addr)[1]);

if (addr->ai_next != 0) {
addr = addr->ai_next;
goto top;
}

a = &net_sin(addr->ai_addr)->sin_addr;
LOG_INF(" Got %s",
log_strdup(net_addr_ntop(addr->ai_family, a,
hr_addr, sizeof(hr_addr))));

}

void do_sntp(struct addrinfo *addr)
{
struct sntp_ctx ctx;
int rc;
s64_t stamp;
struct sntp_time sntp_time;
char time_str[sizeof("1970-01-01T00:00:00")];

LOG_INF("Sending NTP request for current time:");

/* Initialize sntp */
rc = sntp_init(&ctx, addr->ai_addr, sizeof(struct sockaddr_in));
if (rc < 0) {
LOG_ERR("Unable to init sntp context: %d", rc);
return;
}
rc = sntp_query(&ctx, K_FOREVER, &sntp_time);
if (rc == 0) {
stamp = k_uptime_get();
time_base = sntp_time.seconds * MSEC_PER_SEC - stamp;

/* Convert time to make sure. */
time_t now = sntp_time.seconds;
struct tm now_tm;

gmtime_r(&now, &now_tm);
strftime(time_str, sizeof(time_str), "%FT%T", &now_tm);
LOG_INF(" Acquired time: %s", log_strdup(time_str));

} else {
LOG_ERR(" Failed to acquire SNTP, code %d\n", rc);
}

sntp_close(&ctx);
}


void main(void)
{
static struct addrinfo hints;
struct addrinfo *haddr;
int res;
int cnt = 0;
int rc;


app_dhcpv4_startup();

LOG_INF("Should have DHCPv4 lease at this point.");
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_protocol = 0;
//while ((res = getaddrinfo("console.aws.amazon.com", "80", &hints,
while ((res = getaddrinfo("time.google.com", "123", &hints,
&haddr)) && cnt < 3) {
LOG_ERR("Unable to get address for NTP server, retrying");
cnt++;
}

if (res != 0) {
LOG_ERR("Unable to get address of NTP server, exiting %d", res);
return;
}


LOG_INF("DNS resolved for console.aws.amazon.com:80");
show_addrinfo(haddr);
do_sntp(haddr);

#if defined(CONFIG_MQTT_LIB_TLS)

rc = tls_init();
PRINT_RESULT("tls_init", rc);
#endif

mqtt_startup("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", 8883);

}

Console Logs:

[00:00:09.666,000] <inf> aws_mqtt: tls_init: 0 <OK>
[00:00:09.706,000] <inf> aws_mqtt: DNS resolved for agwgaprlb6xdl-
ats.iot.us-west-2.amazonaws.com:8833
[00:00:09.718,000] <dbg> aws_mqtt.try_to_connect: attempting to
connect...
[00:00:09.727,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect:
(0x20401bac): Created socket 0
[00:00:09.739,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect:
(0x20401bac): Before Connect completed
_._,_._,_

Cheers,
Jukka