Add streaming support.

This commit is contained in:
tastytea 2020-01-08 21:27:27 +01:00
parent a724006854
commit d2de78ff9e
Signed by: tastytea
GPG Key ID: CFC39497F1B26E07
6 changed files with 118 additions and 6 deletions

View File

@ -42,7 +42,7 @@ Have a look at the link:{uri-reference}[reference].
* Tested OS: Linux
* C++ compiler (tested: link:{uri-gcc}[GCC] 7/8/9, link:{uri-lang}[clang] 6/7)
* link:{uri-cmake}[CMake] (at least: 3.9)
* link:{uri-curl}[curl] (tested: 7.66 / 7.58)
* link:{uri-curl}[curl] (at least: 7.32)
* Optional
** Documentation: link:{uri-doxygen}[Doxygen] (tested: 1.8)
** Tests: link:{uri-catch}[Catch] (tested: 2.5 / 1.2)

View File

@ -106,6 +106,13 @@ public:
*/
void set_proxy(string_view proxy);
/*!
* @brief Get new stream contents and delete them.
*
* @since 0.1.0
*/
string get_new_stream_contents();
private:
Instance &_instance;
const string_view _baseuri;

View File

@ -22,6 +22,7 @@
#include "curl/curl.h"
#include <map>
#include <mutex>
#include <string>
#include <string_view>
#include <variant>
@ -31,6 +32,7 @@ namespace mastodonpp
{
using std::map;
using std::mutex;
using std::string;
using std::string_view;
using std::variant;
@ -91,10 +93,10 @@ public:
CURLWrapper();
//! Copy constructor
CURLWrapper(const CURLWrapper &other) = default;
CURLWrapper(const CURLWrapper &other) = delete;
//! Move constructor
CURLWrapper(CURLWrapper &&other) noexcept = default;
CURLWrapper(CURLWrapper &&other) noexcept = delete;
/*!
* @brief Cleans up curl and connection.
@ -108,10 +110,10 @@ public:
virtual ~CURLWrapper() noexcept;
//! Copy assignment operator
CURLWrapper& operator=(const CURLWrapper &other) = default;
CURLWrapper& operator=(const CURLWrapper &other) = delete;
//! Move assignment operator
CURLWrapper& operator=(CURLWrapper &&other) noexcept = default;
CURLWrapper& operator=(CURLWrapper &&other) noexcept = delete;
/*!
* @brief Returns pointer to the CURL easy handle.
@ -139,7 +141,29 @@ public:
*/
void set_proxy(string_view proxy);
/*!
* @brief Cancel the stream.
*
* The stream will be cancelled, usually whithin a second. The @link
* answer_type::curl_error_code curl_error_code @endlink of the answer will
* be set to 42 (`CURLE_ABORTED_BY_CALLBACK`).
*
* @since 0.1.0
*/
void cancel_stream();
protected:
/*!
* @brief Mutex for #get_buffer a.k.a. _curl_buffer_body.
*
* This mutex is locked in `writer_body()` and
* Connection::get_new_stream_contents before anything is read or written
* from/to _curl_buffer_body.
*
* @since 0.1.0
*/
mutex buffer_mutex;
/*!
* @brief Make a HTTP request.
*
@ -153,11 +177,23 @@ protected:
answer_type make_request(const http_method &method, string uri,
const parametermap &parameters);
/*!
* @brief Returns a reference to the buffer libcurl writes into.
*
* @since 0.1.0
*/
[[nodiscard]]
string &get_buffer()
{
return _curl_buffer_body;
}
private:
CURL *_connection;
char _curl_buffer_error[CURL_ERROR_SIZE];
string _curl_buffer_headers;
string _curl_buffer_body;
bool _stream_cancelled;
/*!
* @brief libcurl write callback function.
@ -190,6 +226,25 @@ private:
return static_cast<CURLWrapper*>(f)->writer_header(data, sz, nmemb);
}
/*!
* @brief libcurl transfer info function.
*
* Used to cancel streams.
*
* @since 0.1.0
*/
int progress(void *clientp, curl_off_t dltotal, curl_off_t dlnow,
curl_off_t ultotal, curl_off_t ulnow);
//! @copydoc writer_body_wrapper
static inline int progress_wrapper(void *f, void *clientp,
curl_off_t dltotal, curl_off_t dlnow,
curl_off_t ultotal, curl_off_t ulnow)
{
return static_cast<CURLWrapper*>(f)->progress(clientp, dltotal, dlnow,
ultotal, ulnow);
}
/*!
* @brief Setup libcurl connection.
*

View File

@ -1,6 +1,6 @@
include(GNUInstallDirs)
find_package(CURL REQUIRED)
find_package(CURL 7.32 REQUIRED)
# Write version in header.
configure_file ("version.hpp.in"

View File

@ -48,4 +48,14 @@ void Connection::set_proxy(const string_view proxy)
_instance.set_proxy(proxy);
}
string Connection::get_new_stream_contents()
{
buffer_mutex.lock();
auto &buffer{get_buffer()};
auto buffer_copy{buffer};
buffer.clear();
buffer_mutex.unlock();
return buffer_copy;
}
} // namespace mastodonpp

View File

@ -40,6 +40,7 @@ static atomic<uint16_t> curlwrapper_instances{0};
CURLWrapper::CURLWrapper()
: _curl_buffer_error{}
, _stream_cancelled(false)
{
if (curlwrapper_instances == 0)
{
@ -73,9 +74,16 @@ void CURLWrapper::set_proxy(const string_view proxy)
}
}
void CURLWrapper::cancel_stream()
{
_stream_cancelled = true;
}
answer_type CURLWrapper::make_request(const http_method &method, string uri,
const parametermap &parameters)
{
_stream_cancelled = false;
CURLcode code;
switch (method)
{
@ -158,7 +166,9 @@ size_t CURLWrapper::writer_body(char *data, size_t size, size_t nmemb)
return 0;
}
buffer_mutex.lock();
_curl_buffer_body.append(data, size * nmemb);
buffer_mutex.unlock();
return size * nmemb;
}
@ -175,6 +185,16 @@ size_t CURLWrapper::writer_header(char *data, size_t size, size_t nmemb)
return size * nmemb;
}
int CURLWrapper::progress(void *, curl_off_t , curl_off_t ,
curl_off_t , curl_off_t )
{
if (_stream_cancelled)
{
return 1;
}
return 0;
}
void CURLWrapper::setup_curl()
{
if (_connection == nullptr)
@ -224,6 +244,26 @@ void CURLWrapper::setup_curl()
_curl_buffer_error};
}
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg)
code = curl_easy_setopt(_connection, CURLOPT_XFERINFOFUNCTION,
progress_wrapper);
if (code != CURLE_OK)
{
throw CURLException{code, "Failed to set transfer info function",
_curl_buffer_error};
}
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg)
curl_easy_setopt(_connection, CURLOPT_XFERINFODATA, this);
if (code != CURLE_OK)
{
throw CURLException{code, "Failed to set transfer info data",
_curl_buffer_error};
}
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg)
curl_easy_setopt(_connection, CURLOPT_NOPROGRESS, 0L);
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg)
code = curl_easy_setopt(_connection, CURLOPT_USERAGENT,
(string("mastorss/") += version).c_str());