mqtt_connect blocked at connect stage
Linux Weekend Learning
Hi Guys, Please help me out here, the code is blocked internally when calling 'connect' I wrote a code to connect to the Amazon Web Services IOT Service /* * Copyright (c) 2019 Intel Corporation * * SPDX-License-Identifier: Apache-2.0 */ #include <logging/log.h> LOG_MODULE_REGISTER(aws_mqtt, LOG_LEVEL_DBG); #include <zephyr.h> #include <net/socket.h> #include <net/mqtt.h> #include <net/net_config.h> #include <net/net_event.h> #include <net/sntp.h> #include <sys/printk.h> #include <string.h> #include <errno.h> #include "dhcp.h" #include <time.h> #include <inttypes.h> #include "test_certs.h" #if defined(CONFIG_SOCKS) static struct sockaddr socks5_proxy; #endif static struct pollfd fds[1]; static int nfds; #define CONFIG_SAMPLE_CLOUD_AWS_PASSWORD "" #define CONFIG_SAMPLE_CLOUD_AWS_USERNAME "" #define APP_MQTT_BUFFER_SIZE 128 #define APP_CA_CERT_TAG 1 #define APP_PSK_TAG 2 static sec_tag_t m_sec_tags[] = { #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD) APP_CA_CERT_TAG, #endif #if defined(MBEDTLS_KEY_EXCHANGE__SOME__PSK_ENABLED) APP_PSK_TAG, #endif }; /* Buffers for MQTT client. */ static u8_t rx_buffer[APP_MQTT_BUFFER_SIZE]; static u8_t tx_buffer[APP_MQTT_BUFFER_SIZE]; static bool connected; #define MQTT_CLIENTID "zephyr_publisher" struct zsock_addrinfo *haddr; s64_t time_base; static struct sockaddr_storage broker; static struct mqtt_client client_ctx; static void clear_fds(void) { nfds = 0; } time_t my_k_time(time_t *ptr) { s64_t stamp; time_t now; stamp = k_uptime_get(); now = (time_t)((stamp + time_base) / 1000); if (ptr) { *ptr = now; } return now; } static void broker_init(void) { struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker; broker4->sin_family = AF_INET; broker4->sin_port = htons(8883); net_ipaddr_copy(&broker4->sin_addr, &net_sin(haddr->ai_addr)->sin_addr); #if defined(CONFIG_SOCKS) struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy; proxy4->sin_family = AF_INET; proxy4->sin_port = htons(1081); inet_pton(AF_INET, "agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", &proxy4->sin_addr); #endif } void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt) { int err; switch (evt->type) { case MQTT_EVT_CONNACK: if (evt->result != 0) { LOG_ERR("MQTT connect failed %d", evt->result); break; } connected = true; LOG_INF("MQTT client connected!"); break; case MQTT_EVT_DISCONNECT: LOG_INF("MQTT client disconnected %d", evt->result); connected = false; clear_fds(); break; case MQTT_EVT_PUBACK: if (evt->result != 0) { LOG_ERR("MQTT PUBACK error %d", evt->result); break; } LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id); break; case MQTT_EVT_PUBREC: if (evt->result != 0) { LOG_ERR("MQTT PUBREC error %d", evt->result); break; } LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id); const struct mqtt_pubrel_param rel_param = { .message_id = evt->param.pubrec.message_id }; err = mqtt_publish_qos2_release(client, &rel_param); if (err != 0) { LOG_ERR("Failed to send MQTT PUBREL: %d", err); } break; case MQTT_EVT_PUBCOMP: if (evt->result != 0) { LOG_ERR("MQTT PUBCOMP error %d", evt->result); break; } LOG_INF("PUBCOMP packet id: %u", evt->param.pubcomp.message_id); break; default: break; } } static void client_init(struct mqtt_client *client) { static struct mqtt_utf8 password; static struct mqtt_utf8 username; mqtt_client_init(client); broker_init(); password.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_PASSWORD; password.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_PASSWORD); client->password = &password; username.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_USERNAME; username.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_USERNAME); client->user_name = &username; /* MQTT client configuration */ client->broker = &broker; client->evt_cb = mqtt_evt_handler; client->client_id.utf8 = (u8_t *)MQTT_CLIENTID; client->client_id.size = strlen(MQTT_CLIENTID); client->password = NULL; client->user_name = NULL; client->protocol_version = MQTT_VERSION_3_1_1; /* MQTT buffers configuration */ client->rx_buf = rx_buffer; client->rx_buf_size = sizeof(rx_buffer); client->tx_buf = tx_buffer; client->tx_buf_size = sizeof(tx_buffer); client->transport.type = MQTT_TRANSPORT_SECURE; struct mqtt_sec_config *tls_config = &client->transport.tls.config; tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED; tls_config->cipher_list = NULL; tls_config->sec_tag_list = m_sec_tags; tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags); tls_config->hostname = "agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com"; client->transport.type = MQTT_TRANSPORT_SECURE; #if defined(CONFIG_SOCKS) mqtt_client_set_proxy(client, &socks5_proxy, socks5_proxy.sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)); #endif } #define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR") #define PRINT_RESULT(func, rc) \ LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc)) static void prepare_fds(struct mqtt_client *client) { if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) { fds[0].fd = client->transport.tcp.sock; } #if defined(CONFIG_MQTT_LIB_TLS) else if (client->transport.type == MQTT_TRANSPORT_SECURE) { fds[0].fd = client->transport.tls.sock; } #endif fds[0].events = ZSOCK_POLLIN; nfds = 1; } static void wait(int timeout) { if (nfds > 0) { if (poll(fds, nfds, timeout) < 0) { LOG_ERR("poll error: %d", errno); } } } static int try_to_connect(struct mqtt_client *client) { u8_t retries = 3U; int rc; LOG_DBG("attempting to connect..."); client_init(client); rc = mqtt_connect(client); if (rc != 0) { PRINT_RESULT("mqtt_connect", rc); k_sleep(500); } else { prepare_fds(client); wait(500); mqtt_input(client); if (!connected) { LOG_INF("Aborting connection\n"); mqtt_abort(client); } } if (connected) { return 0; } return -EINVAL; } static int tls_init(void) { int err = -EINVAL; LOG_INF("Ca size:%d\n", sizeof(ca_certificate)); err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE, ca_certificate, sizeof(ca_certificate)); if (err < 0) { LOG_ERR("Failed to register public certificate: %d", err); return err; } return err; } void mqtt_startup(char *hostname, int port) { struct mqtt_client *client = &client_ctx; int retries = 5; int err, cnt; static struct zsock_addrinfo hints; int res = 0; int rc; hints.ai_family = AF_INET; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = 0; cnt = 0; while ((err = getaddrinfo("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", "8883", &hints, &haddr)) && cnt < 3) { LOG_ERR("Unable to get address for broker, retrying"); cnt++; } if (err != 0) { LOG_ERR("Unable to get address for broker, error %d", res); return; } LOG_INF("DNS resolved for agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com:8833"); try_to_connect(client); PRINT_RESULT("try_to_connect", rc); } static void show_addrinfo(struct addrinfo *addr) { char hr_addr[NET_IPV6_ADDR_LEN]; void *a; top: LOG_DBG(" flags : %d", addr->ai_flags); LOG_DBG(" family : %d", addr->ai_family); LOG_DBG(" socktype: %d", addr->ai_socktype); LOG_DBG(" protocol: %d", addr->ai_protocol); LOG_DBG(" addrlen : %d", (int)addr->ai_addrlen); /* Assume two words. */ LOG_DBG(" addr[0]: 0x%lx", ((uint32_t *)addr->ai_addr)[0]); LOG_DBG(" addr[1]: 0x%lx", ((uint32_t *)addr->ai_addr)[1]); if (addr->ai_next != 0) { addr = addr->ai_next; goto top; } a = &net_sin(addr->ai_addr)->sin_addr; LOG_INF(" Got %s", log_strdup(net_addr_ntop(addr->ai_family, a, hr_addr, sizeof(hr_addr)))); } void do_sntp(struct addrinfo *addr) { struct sntp_ctx ctx; int rc; s64_t stamp; struct sntp_time sntp_time; char time_str[sizeof("1970-01-01T00:00:00")]; LOG_INF("Sending NTP request for current time:"); /* Initialize sntp */ rc = sntp_init(&ctx, addr->ai_addr, sizeof(struct sockaddr_in)); if (rc < 0) { LOG_ERR("Unable to init sntp context: %d", rc); return; } rc = sntp_query(&ctx, K_FOREVER, &sntp_time); if (rc == 0) { stamp = k_uptime_get(); time_base = sntp_time.seconds * MSEC_PER_SEC - stamp; /* Convert time to make sure. */ time_t now = sntp_time.seconds; struct tm now_tm; gmtime_r(&now, &now_tm); strftime(time_str, sizeof(time_str), "%FT%T", &now_tm); LOG_INF(" Acquired time: %s", log_strdup(time_str)); } else { LOG_ERR(" Failed to acquire SNTP, code %d\n", rc); } sntp_close(&ctx); } void main(void) { static struct addrinfo hints; struct addrinfo *haddr; int res; int cnt = 0; int rc; app_dhcpv4_startup(); LOG_INF("Should have DHCPv4 lease at this point."); hints.ai_family = AF_INET; hints.ai_socktype = SOCK_DGRAM; hints.ai_protocol = 0; //while ((res = getaddrinfo("console.aws.amazon.com", "80", &hints, while ((res = getaddrinfo("time.google.com", "123", &hints, &haddr)) && cnt < 3) { LOG_ERR("Unable to get address for NTP server, retrying"); cnt++; } if (res != 0) { LOG_ERR("Unable to get address of NTP server, exiting %d", res); return; } LOG_INF("DNS resolved for console.aws.amazon.com:80"); show_addrinfo(haddr); do_sntp(haddr); #if defined(CONFIG_MQTT_LIB_TLS) rc = tls_init(); PRINT_RESULT("tls_init", rc); #endif mqtt_startup("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", 8883); } Console Logs: [00:00:09.666,000] <inf> aws_mqtt: tls_init: 0 <OK> [00:00:09.706,000] <inf> aws_mqtt: DNS resolved for agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com:8833 [00:00:09.718,000] <dbg> aws_mqtt.try_to_connect: attempting to connect... [00:00:09.727,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect: (0x20401bac): Created socket 0 [00:00:09.739,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect: (0x20401bac): Before Connect completed
|
|