Skip to content

Commit

Permalink
exprerimental: remove String from client, delegate data management to…
Browse files Browse the repository at this point in the history
… modules
  • Loading branch information
mcspr committed Sep 19, 2019
1 parent 1653786 commit 3076bc9
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 25 deletions.
68 changes: 55 additions & 13 deletions code/espurna/libs/Http.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#define ASYNC_HTTP_DEBUG(...) //DEBUG_PORT.printf(__VA_ARGS__)
#endif

// TODO: customizable headers
// <method> <path> <host> <len>
const char HTTP_REQUEST_TEMPLATE[] PROGMEM =
"%s %s HTTP/1.1\r\n"
Expand Down Expand Up @@ -56,6 +57,11 @@ class AsyncHttp {

AsyncClient client;

enum cfg_t {
HTTP_SEND = 1 << 0,
HTTP_RECV = 1 << 1
};

enum class state_t : uint8_t {
NONE,
HEADERS,
Expand All @@ -66,8 +72,11 @@ class AsyncHttp {
using on_status_f = std::function<bool(AsyncHttp*, uint16_t status)>;
using on_disconnected_f = std::function<void(AsyncHttp*)>;
using on_error_f = std::function<void(AsyncHttp*, const AsyncHttpError&)>;
using on_body_f = std::function<void(AsyncHttp*, uint8_t*, size_t)>;

using on_body_recv_f = std::function<void(AsyncHttp*, uint8_t* data, size_t len)>;
using on_body_send_f = std::function<int(AsyncHttp*, AsyncClient* client)>;

int cfg = HTTP_RECV;
state_t state = state_t::NONE;
AsyncHttpError::error_t last_error;

Expand All @@ -77,22 +86,33 @@ class AsyncHttp {
on_status_f on_status;
on_error_f on_error;

on_body_f on_body;
on_body_recv_f on_body_recv;
on_body_send_f on_body_send;

String method;
String path;

String host;
uint16_t port;

String data; // TODO: generic data source, feed chunks of (bytes, len) and call us back when done

uint32_t ts;
uint32_t timeout = 5000;

bool connected = false;
bool connecting = false;

// TODO: since we are single threaded, no need to buffer anything and we can directly use client->add with anything right in the body_send callback
// buuut... this exposes asyncclient to the modules, maybe this needs a simple cbuf periodically flushing the data and this method simply filling it
// (ref: AsyncTCPBuffer class in ESPAsyncTCP or ESPAsyncWebServer chuncked response callback)
void trySend() {
if (!client.canSend()) return;
if (!on_body_send) {
client.close(true);
return;
}
on_body_send(this, &client);
}

protected:

static AsyncHttpError _timeoutError(AsyncHttpError::error_t error, const __FlashStringHelper* message, uint32_t ts) {
Expand All @@ -107,7 +127,6 @@ class AsyncHttp {
static void _onDisconnect(void* http_ptr, AsyncClient*) {
AsyncHttp* http = static_cast<AsyncHttp*>(http_ptr);
if (http->on_disconnected) http->on_disconnected(http);
http->data = "";
http->ts = 0;
http->connected = false;
http->connecting = false;
Expand All @@ -120,15 +139,14 @@ class AsyncHttp {
AsyncHttp* http = static_cast<AsyncHttp*>(http_ptr);
http->last_error = AsyncHttpError::NETWORK_TIMEOUT;
if (http->on_error) http->on_error(http, _timeoutError(AsyncHttpError::NETWORK_TIMEOUT, F("Network timeout after"), time));
// TODO: close connection when acks are missing?
}

static void _onPoll(void* http_ptr, AsyncClient*) {
static void _onPoll(void* http_ptr, AsyncClient* client) {
AsyncHttp* http = static_cast<AsyncHttp*>(http_ptr);
const auto diff = millis() - http->ts;
if (diff > http->timeout) {
if (http->on_error) http->on_error(http, _timeoutError(AsyncHttpError::REQUEST_TIMEOUT, F("No response after"), diff));
http->client.close(true);
client->close(true);
}
}

Expand Down Expand Up @@ -208,7 +226,7 @@ class AsyncHttp {
}
ASYNC_HTTP_DEBUG("ok | body len %u!\n", len);

if (http->on_body) http->on_body(http, (uint8_t*) response, len);
if (http->on_body_recv) http->on_body_recv(http, (uint8_t*) response, len);
return;
}
}
Expand All @@ -232,6 +250,18 @@ class AsyncHttp {
+ http->host.length()
+ http->path.length()
+ 32;

int data_len = 0;
if (http->cfg & HTTP_SEND) {
if (!http->on_body_send) {
ASYNC_HTTP_DEBUG("err | no send_body callback set\n");
client->close(true);
return;
}
// XXX: ...class instead of this multi-function?
data_len = http->on_body_send(http, nullptr);
}

char* headers = (char *) malloc(headers_len + 1);

if (!headers) {
Expand All @@ -245,7 +275,7 @@ class AsyncHttp {
http->method.c_str(),
http->path.c_str(),
http->host.c_str(),
http->data.length()
data_len
);
if (res >= (headers_len + 1)) {
ASYNC_HTTP_DEBUG("err | res>=len :: %u>=%u\n", res, headers_len + 1);
Expand All @@ -256,17 +286,21 @@ class AsyncHttp {

client->write(headers);
free(headers);
// TODO: streaming data source instead of using a simple String
// TODO: move to onPoll, ->add(data) and ->send() until it can't (returns 0), then repeat
client->write(http->data.c_str());

if (http->cfg & HTTP_SEND) http->trySend();
}

static void _onError(void* http_ptr, AsyncClient* client, err_t err) {
AsyncHttp* http = static_cast<AsyncHttp*>(http_ptr);
if (http->on_error) http->on_error(http, {AsyncHttpError::CLIENT_ERROR, client->errorToString(err)});
}

static void _onAck(void* http_ptr, AsyncClient* client, size_t, uint32_t) {
AsyncHttp* http = static_cast<AsyncHttp*>(http_ptr);
http->ts = millis();
if (http->cfg & HTTP_SEND) http->trySend();
}

public:
AsyncHttp() {
client.onDisconnect(_onDisconnect, this);
Expand All @@ -275,6 +309,7 @@ class AsyncHttp {
client.onData(_onData, this);
client.onConnect(_onConnect, this);
client.onError(_onError, this);
client.onAck(_onAck, this);
}
~AsyncHttp() = default;

Expand All @@ -290,6 +325,13 @@ class AsyncHttp {
this->path = path;
this->ts = millis();

// Treat every method as GET (receive-only), exception for POST / PUT to send data out
this->cfg = HTTP_RECV;
if (this->method.equals("POST") || this->method.equals("PUT")) {
if (!this->on_body_send) return false;
this->cfg = HTTP_SEND | HTTP_RECV;
}

bool status = false;

#if ASYNC_TCP_SSL_ENABLED
Expand Down
2 changes: 1 addition & 1 deletion code/espurna/ota_asynctcp.ino
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ void _otaClientFrom(const String& url) {
_ota_client->on_status = _otaOnStatus;
_ota_client->on_error = _otaOnError;

_ota_client->on_body = _otaClientOnBody;
_ota_client->on_body_recv = _otaClientOnBody;
}

#if ASYNC_TCP_SSL_ENABLED
Expand Down
36 changes: 27 additions & 9 deletions code/espurna/thinkspeak.ino
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ Copyright (C) 2019 by Xose Pérez <xose dot perez at gmail dot com>

#if THINGSPEAK_SUPPORT

#include "libs/Http.h"

#if THINGSPEAK_USE_ASYNC
#include <ESPAsyncTCP.h>
#include "libs/Http.h"
#else
#include <ESP8266WiFi.h>
#endif
Expand Down Expand Up @@ -95,13 +94,36 @@ void _tspkWebSocketOnConnected(JsonObject& root) {
#if THINGSPEAK_USE_ASYNC

AsyncHttp* _tspk_client = nullptr;
String _tspk_data;

void _tspkFlushAgain() {
DEBUG_MSG_P(PSTR("[THINGSPEAK] Re-enqueuing %u more time(s)\n"), _tspk_state.tries);
_tspk_state.flush = true;
}

void _tspkOnBody(AsyncHttp* http, uint8_t* data, size_t len) {
// TODO: maybe http object can keep a context containing the data
// however, it should not be restricted to string datatype
int _tspkOnBodySend(AsyncHttp* http, AsyncClient* client) {
if (!client) {
_tspk_data = _tspkPrepareData(_tspk_queue);
return _tspk_data.length();
}

const size_t data_len = _tspk_data.length();
if (!data_len || (client->space() < data_len)) {
return 0;
}

if (data_len == client->add(_tspk_data.c_str(), data_len)) {
DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), http->path.c_str(), _tspk_data.c_str());
client->send();
_tspk_data = "";
}

return data_len;
}

void _tspkOnBodyRecv(AsyncHttp* http, uint8_t* data, size_t len) {

unsigned int code = 0;
if (len) {
Expand Down Expand Up @@ -154,11 +176,6 @@ void _tspkOnConnected(AsyncHttp* http) {
}
}
#endif

// Note: always replacing old data in case of retry
http->data = _tspkPrepareData(_tspk_queue);

DEBUG_MSG_P(PSTR("[THINGSPEAK] POST %s?%s\n"), http->path.c_str(), http->data.c_str());
}

constexpr const unsigned long THINGSPEAK_CLIENT_TIMEOUT = 5000;
Expand All @@ -174,7 +191,8 @@ void _tspkInitClient() {
_tspk_client->on_status = _tspkOnStatus;
_tspk_client->on_error = _tspkOnError;

_tspk_client->on_body = _tspkOnBody;
_tspk_client->on_body_recv = _tspkOnBodyRecv;
_tspk_client->on_body_send = _tspkOnBodySend;

}

Expand Down
4 changes: 2 additions & 2 deletions code/platformio.ini
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ upload_flags = ${common.ota_upload_flags}
board = ${common.board_4m}
build_flags = ${common.build_flags_4m1m} -DNODEMCU_LOLIN -DDEBUG_FAUXMO=Serial -DNOWSAUTH

[env:nodemcu-lolin-252]
platform = ${common.arduino_core_2_5_2}
[env:nodemcu-lolin-git]
platform = ${common.arduino_core_git}
board = ${common.board_4m}
build_flags = ${common.build_flags_4m1m} -DNODEMCU_LOLIN -DDEBUG_FAUXMO=Serial -DNOWSAUTH

Expand Down

0 comments on commit 3076bc9

Please sign in to comment.