From: Daniel Karbach Date: Wed, 16 Oct 2024 22:22:49 +0000 (+0200) Subject: auto shoutout X-Git-Url: https://git.localhorst.tv/?a=commitdiff_plain;h=2557aca39ea38d19029d6f91151ef3e2cb4668b5;p=ffmpeg-test.git auto shoutout --- diff --git a/src/app/Application.h b/src/app/Application.h index f17ee82..6ac687e 100644 --- a/src/app/Application.h +++ b/src/app/Application.h @@ -5,9 +5,11 @@ #include #include +#include "ChannelInfo.h" #include "DrawingGame.h" #include "Mixer.h" #include "Renderer.h" +#include "Shoutout.h" #include "State.h" #include "Stream.h" #include "../ffmpeg/Network.h" @@ -31,6 +33,7 @@ public: , mixer(stream.GetAudioPlane(), stream.GetAudioChannels(), stream.GetAudioFrameSize()) , renderer(stream.GetVideoPlane(), stream.GetVideoLineSize(), width, height) , state(width, height) + , own_channel_id("1020523186") , drawing_game(renderer.GetContext(), 45, 50, { 1, 1, 1 }) { state.SetGame(&drawing_game); } @@ -42,8 +45,19 @@ public: public: void Start() { - pusher_conn.Subscribe("ChatBotLog", &PusherHandler, this); - twitch_conn.Join("#horstiebot", &TwitchHandler, this); + std::cout << "starting services" << std::endl; + pusher_conn.Subscribe("Channel").Then([this](const Json::Value &json) -> void { + HandlePusherChannel(json); + }); + pusher_conn.Subscribe("ChatBotLog").Then([this](const Json::Value &json) -> void { + HandlePusherChatBotLog(json); + }); + twitch_conn.Join("#horstiebot").Then([this](const twitch::IRCMessage &msg) -> void { + HandleTwitch(msg); + }); + ws_ctx.HttpsRequest("GET", "alttp.localhorst.tv", "/api/channels?chatting=1").GetPromise().Then([this](ws::HttpsConnection &rsp) -> void { + InitChannels(rsp); + }); stream.Start(); //Media &media = state.AddMedia("test.mp4"); @@ -87,8 +101,8 @@ public: state.Clean(); - if (difference > 3000) { - std::this_thread::sleep_for(std::chrono::milliseconds(difference - 3000)); + if (difference > 1000) { + std::this_thread::sleep_for(std::chrono::milliseconds(difference - 1000)); } } @@ -98,12 +112,60 @@ public: } private: - static void PusherHandler(void *user, const Json::Value &json) { - Application *app = static_cast(user); - app->HandlePusher(json); + void InitChannels(const ws::HttpsConnection &rsp) { + Json::Value json; + Json::Reader json_reader; + json_reader.parse(rsp.GetBody(), json); + for (const Json::Value &channel : json) { + int channel_id = channel["id"].asInt(); + ChannelInfo &info = state.GetChannelInfo(channel_id); + info.Update(channel); + } } - void HandlePusher(const Json::Value &json) { + void HandlePusherChannel(const Json::Value &json) { + const std::string event = json["event"].asString(); + if (event != "ChannelUpdated") return; + + const std::string data_string = json["data"].asString(); + Json::Value data; + Json::Reader json_reader; + json_reader.parse(data_string, data); + UpdateChannel(data["model"]); + } + + void UpdateChannel(const Json::Value &json) { + int channel_id = json["id"].asInt(); + bool is_live =json["twitch_live"].asBool(); + bool is_known = state.IsChannelKnown(channel_id); + ChannelInfo &channel = state.GetChannelInfo(channel_id); + bool went_live = !channel.twitch_live && is_live; + bool went_down = channel.twitch_live && !is_live; + channel.Update(json); + if (went_live) { + // channel went live + std::cout << "channel " << channel.title << " went live" << std::endl; + ShoutoutChannel(channel_id); + } else if (went_down) { + // channel went down + std::cout << "channel " << channel.title << " went down" << std::endl; + } + } + + void ShoutoutChannel(int id) { + ChannelInfo &channel = state.GetChannelInfo(id); + Shoutout &shout = renderer.CreateShoutout(id, state); + if (!channel.twitch_id.empty() && channel.twitch_id != own_channel_id) { + ws::HttpsConnection &req = twitch_conn.AuthorizedRequest("POST", "api.twitch.tv", "/helix/chat/shoutouts"); + req.SetHeader("Content-Type", "application/x-www-form-urlencoded"); + req.AddFormUrlenc("from_broadcaster_id", own_channel_id); + req.AddFormUrlenc("to_broadcaster_id", channel.twitch_id); + req.AddFormUrlenc("moderator_id", own_channel_id); + req.SetContentLength(); + } + } + + void HandlePusherChatBotLog(const Json::Value &json) { const std::string data_string = json["data"].asString(); Json::Value data; Json::Reader json_reader; @@ -124,11 +186,6 @@ private: msg.Update(renderer.GetContext()); } - static void TwitchHandler(void *user, const twitch::IRCMessage &msg) { - Application *app = static_cast(user); - app->HandleTwitch(msg); - } - void HandleTwitch(const twitch::IRCMessage &msg) { if (state.HasGame()) { state.GetGame().Handle(msg); @@ -145,6 +202,7 @@ private: Mixer mixer; Renderer renderer; State state; + std::string own_channel_id; DrawingGame drawing_game; diff --git a/src/app/ChannelInfo.h b/src/app/ChannelInfo.h new file mode 100644 index 0000000..d002464 --- /dev/null +++ b/src/app/ChannelInfo.h @@ -0,0 +1,52 @@ +#ifndef TEST_APP_CHANNELINFO_H_ +#define TEST_APP_CHANNELINFO_H_ + +#include +#include +#include + +namespace app { + +struct ChannelInfo { + + ChannelInfo() + : id(0), title(), chat(false), join(false), twitch_live(false) { + } + explicit ChannelInfo(const Json::Value &json) + : id(json["id"].asInt()) + , title(json["title"].asString()) + , twitch_id(json["twitch_id"].asString()) + , twitch_title(json["twitch_title"].asString()) + , twitch_category(json["twitch_category_name"].asString()) + , chat(json["chat"].asBool()) + , join(json["join"].asBool()) + , twitch_live(json["twitch_live"].asBool()) { + } + + void Update(const Json::Value &json) { + if (json["id"].asInt() != id) { + throw std::runtime_error("update channel ID mismatch"); + } + title = json["title"].asString(); + twitch_id = json["twitch_id"].asString(); + twitch_title = json["twitch_title"].asString(); + twitch_category = json["twitch_category_name"].asString(); + chat = json["chat"].asBool(); + join = json["join"].asBool(); + twitch_live = json["twitch_live"].asBool(); + } + + int id; + std::string title; + std::string twitch_id; + std::string twitch_title; + std::string twitch_category; + bool chat; + bool join; + bool twitch_live; + +}; + +} + +#endif diff --git a/src/app/Clock.h b/src/app/Clock.h index c5f465b..ff98e35 100644 --- a/src/app/Clock.h +++ b/src/app/Clock.h @@ -69,6 +69,10 @@ public: return av_rescale_q(counter, timebase, AVRational{1, 1000}); } + int64_t GetSeconds() const { + return av_rescale_q(counter, timebase, AVRational{1, 1}); + } + const AVRational &GetTimebase() const { return timebase; } diff --git a/src/app/Renderer.h b/src/app/Renderer.h index 76a658e..ae10c52 100644 --- a/src/app/Renderer.h +++ b/src/app/Renderer.h @@ -1,6 +1,7 @@ #ifndef TEST_APP_RENDERER_H_ #define TEST_APP_RENDERER_H_ +#include "Shoutout.h" #include extern "C" { @@ -8,6 +9,7 @@ extern "C" { } #include "Message.h" +#include "Shoutout.h" #include "State.h" #include "../cairo/Context.h" #include "../cairo/Surface.h" @@ -36,10 +38,6 @@ public: ctx.SetSourceRGB(0, 0, 0); ctx.Paint(); - for (const Media &media : state.GetMedia()) { - media.Render(ctx); - } - for (const Message &msg : state.GetMessages()) { msg.Render(ctx); } @@ -48,6 +46,14 @@ public: state.GetGame().Render(ctx); } + for (const Media &media : state.GetMedia()) { + media.Render(ctx); + } + + if (!state.GetShoutouts().empty()) { + state.GetShoutouts().front().Render(ctx); + } + surface.Flush(); } @@ -58,6 +64,15 @@ public: return msg; } + Shoutout &CreateShoutout(int channel_id, State &state) { + Shoutout &shout = state.AddShoutout(channel_id, ctx); + shout.SetTitleFont(text_font); + shout.SetChannelFont(channel_font); + shout.SetCategoryFont(channel_font); + shout.Recalc(ctx); + return shout; + } + cairo::Context &GetContext() { return ctx; } diff --git a/src/app/Shoutout.h b/src/app/Shoutout.h new file mode 100644 index 0000000..c9dcc51 --- /dev/null +++ b/src/app/Shoutout.h @@ -0,0 +1,154 @@ +#ifndef TEST_APP_SHOUTOUT_H_ +#define TEST_APP_SHOUTOUT_H_ + +#include "ChannelInfo.h" +#include "Clock.h" +#include "../cairo/Context.h" +#include "../gfx/ColorRGB.h" +#include "../gfx/Position.h" +#include "../gfx/Spacing.h" +#include + +namespace app { + +class Shoutout { + +public: + Shoutout(const ChannelInfo &channel, cairo::Context &ctx) + : channel(channel) + , title_layout(ctx.CreateLayout()) + , channel_layout(ctx.CreateLayout()) + , category_layout(ctx.CreateLayout()) + , bg_color{ 0.1, 0.1, 0.1 } + , title_color{ 1, 1, 1 } + , channel_color{ 0.392, 0.255, 0.647 } + , anchor{ 1280, 720 - 75 } + , size{ 1280, 720 } + , padding(10) + , category_color{ 0.6, 0.6, 0.6 } + , start_time() + , running(false) + , done(false) { + title_layout.SetText(channel.twitch_title); + channel_layout.SetText(channel.title); + category_layout.SetText(channel.twitch_category); + + size.w = 1280 - 2 * 75; + title_layout.SetWidth(size.w - padding.Horizontal()); + channel_layout.SetWidth((size.w - padding.Horizontal(2)) / 2.0); + category_layout.SetWidth((size.w - padding.Horizontal(2)) / 2.0); + ctx.UpdateLayout(title_layout); + ctx.UpdateLayout(channel_layout); + ctx.UpdateLayout(category_layout); + + size.h = padding.Vertical(2) + title_layout.GetLogicalRect().height + std::max(channel_layout.GetLogicalRect().height, category_layout.GetLogicalRect().height); + } + ~Shoutout() { + } + + Shoutout(const Shoutout &) = delete; + Shoutout &operator =(const Shoutout &) = delete; + +public: + bool Loading() const { + return false; + } + + bool Running() const { + return running; + } + + bool Done() const { + return done; + } + + void SetTitleFont(pango::Font &font) { + title_layout.SetFont(font); + } + + void SetChannelFont(pango::Font &font) { + channel_layout.SetFont(font); + } + + void SetCategoryFont(pango::Font &font) { + category_layout.SetFont(font); + } + + void Start(const Clock &clock) { + start_time = clock; + running = true; + } + + void Update(cairo::Context &ctx, const Clock &clock) { + const Clock runtime = clock.Difference(start_time); + int64_t ms = runtime.GetMS(); + if (ms < 600) { + anchor.x = runtime.InterpolateClamp(1280, 75, 0, 600); + anchor.y = 720 - 75; + } else if (ms > 59000) { + anchor.x = 75; + anchor.y = runtime.InterpolateClamp(720 - 75, 720 + size.h, 59000, 59400); + } else { + anchor.x = 75; + anchor.y = 720 - 75; + } + if (ms > 125000) { + done = true; + } + } + + void Recalc(cairo::Context &ctx) { + ctx.UpdateLayout(title_layout); + ctx.UpdateLayout(channel_layout); + ctx.UpdateLayout(category_layout); + + size.h = padding.Vertical(2) + title_layout.GetLogicalRect().height + std::max(channel_layout.GetLogicalRect().height, category_layout.GetLogicalRect().height); + } + + void Render(cairo::Context &ctx) const { + gfx::Position pos = anchor - gfx::Position{ 0, size.h }; + + ctx.SetSourceColor(bg_color); + ctx.Rectangle(pos, size); + ctx.Fill(); + + gfx::Position title_pos = pos + padding.InnerTL(size); + ctx.MoveTo(title_pos); + ctx.SetSourceColor(title_color); + ctx.DrawLayout(title_layout); + + gfx::Position channel_pos = pos + padding.InnerBL(size) - gfx::Position{ 0.0, double(channel_layout.GetLogicalRect().height) }; + ctx.MoveTo(channel_pos); + ctx.SetSourceColor(channel_color); + ctx.DrawLayout(channel_layout); + + gfx::Position category_pos = pos + padding.InnerBR(size) - gfx::Position{ double(category_layout.GetLogicalRect().width), double(category_layout.GetLogicalRect().height) }; + ctx.MoveTo(category_pos); + ctx.SetSourceColor(category_color); + ctx.DrawLayout(category_layout); + } + +private: + const ChannelInfo &channel; + + pango::Layout title_layout; + pango::Layout channel_layout; + pango::Layout category_layout; + gfx::ColorRGB bg_color; + gfx::ColorRGB title_color; + gfx::ColorRGB channel_color; + gfx::ColorRGB category_color; + + gfx::Position anchor; + gfx::Size size; + gfx::Spacing padding; + + Clock start_time; + bool running; + bool done; + +}; + +} + +#endif diff --git a/src/app/State.h b/src/app/State.h index fcf6828..b532aec 100644 --- a/src/app/State.h +++ b/src/app/State.h @@ -2,12 +2,15 @@ #define TEST_APP_STATE_H_ #include +#include #include +#include "ChannelInfo.h" #include "Clock.h" #include "Game.h" #include "Media.h" #include "Message.h" +#include "Shoutout.h" #include "../cairo/Context.h" #include "../gfx/Position.h" @@ -41,6 +44,21 @@ public: game = nullptr; } + bool IsChannelKnown(int id) const { + auto it = channels.find(id); + return it != channels.end(); + } + + ChannelInfo &GetChannelInfo(int id) { + auto it = channels.find(id); + if (it != channels.end()) { + return it->second; + } + ChannelInfo &info = channels[id]; + info.id = id; + return info; + } + const std::list &GetMedia() const { return media; } @@ -49,8 +67,11 @@ public: return msgs; } + const std::list &GetShoutouts() const { + return shoutouts; + } + Media &AddMedia(const char *url) { - std::cout << "adding media " << url << std::endl; media.emplace_back(url); return media.back(); } @@ -60,6 +81,11 @@ public: return msgs.front(); } + Shoutout &AddShoutout(int channel_id, cairo::Context &ctx) { + shoutouts.emplace_back(GetChannelInfo(channel_id), ctx); + return shoutouts.back(); + } + int GetWidth() const { return width; } @@ -92,6 +118,15 @@ public: if (HasGame()) { GetGame().Update(ctx, clock); } + if (!shoutouts.empty()) { + if (shoutouts.front().Done()) { + shoutouts.pop_front(); + } else if (shoutouts.front().Running()) { + shoutouts.front().Update(ctx, clock); + } else if (!shoutouts.front().Loading()) { + shoutouts.front().Start(clock); + } + } } void Clean() { @@ -114,8 +149,11 @@ private: Game *game; + std::map channels; + std::list media; std::list msgs; + std::list shoutouts; }; diff --git a/src/gfx/Position.h b/src/gfx/Position.h index 380e67b..0f2909e 100644 --- a/src/gfx/Position.h +++ b/src/gfx/Position.h @@ -12,14 +12,18 @@ struct Position { }; +inline gfx::Position operator +(const Position &a, const Position &b) { + return gfx::Position{a.x + b.x, a.y + b.y}; } -inline gfx::Position operator +(const gfx::Position &a, const gfx::Position &b) { - return gfx::Position{a.x + b.x, a.y + b.y}; +inline gfx::Position operator -(const Position &a, const Position &b) { + return gfx::Position{a.x - b.x, a.y - b.y}; } -inline std::ostream &operator <<(std::ostream &out, const gfx::Position &pos) { +inline std::ostream &operator <<(std::ostream &out, const Position &pos) { return out << '(' << pos.x << ", " << pos.y << ')'; } +} + #endif diff --git a/src/gfx/Spacing.h b/src/gfx/Spacing.h index d6f0bf8..c2bef09 100644 --- a/src/gfx/Spacing.h +++ b/src/gfx/Spacing.h @@ -2,6 +2,7 @@ #define TEST_GFX_SPACING_H_ #include "Position.h" +#include "Size.h" namespace gfx { @@ -11,18 +12,48 @@ struct Spacing { double top = 0.0; double bottom = 0.0; double right = 0.0; + double h_inter = 0.0; + double v_inter = 0.0; explicit Spacing(double all) - : left(all), top(all), bottom(all), right(all) { + : left(all), top(all), bottom(all), right(all), h_inter(all), v_inter(all) { } Spacing(double horiz, double vert) - : left(horiz), top(vert), bottom(horiz), right(vert) { + : left(horiz), top(vert), bottom(horiz), right(vert), h_inter(horiz), v_inter(vert) { } Position Offset() const { return Position{left, top}; } + Position Offset(int nx, int ny) const { + return Position{left + h_inter * double(nx), top + v_inter * double(ny)}; + } + + double Horizontal(int n = 1) const { + return left + right + h_inter * double(n - 1); + } + + double Vertical(int n = 1) const { + return top + bottom + v_inter * double(n - 1); + } + + Position InnerTL(const Size &bounds) const { + return Position{ left, top }; + } + + Position InnerTR(const Size &bounds) const { + return Position{ bounds.w - right, top }; + } + + Position InnerBL(const Size &bounds) const { + return Position{ left, bounds.h - bottom }; + } + + Position InnerBR(const Size &bounds) const { + return Position{ bounds.w - right, bounds.h - bottom }; + } + }; } diff --git a/src/sys/Promise.h b/src/sys/Promise.h index 3a2a6ad..e20c76c 100644 --- a/src/sys/Promise.h +++ b/src/sys/Promise.h @@ -1,6 +1,7 @@ #ifndef TEST_SYS_PROMISE_H_ #define TEST_SYS_PROMISE_H_ +#include #include #include #include @@ -29,6 +30,8 @@ public: for (Callback &callback : success) { try { callback(args...); + } catch (const std::exception &e) { + std::cerr << "exception in promise resolution: " << e.what() << std::endl; } catch (...) { std::cerr << "exception in promise resolution" << std::endl; } @@ -39,6 +42,8 @@ public: for (Callback &callback : error) { try { callback(args...); + } catch (const std::exception &e) { + std::cerr << "exception in promise rejection: " << e.what() << std::endl; } catch (...) { std::cerr << "exception in promise rejection" << std::endl; } diff --git a/src/twitch/LoginToken.h b/src/twitch/LoginToken.h index 6b76617..1db38a1 100644 --- a/src/twitch/LoginToken.h +++ b/src/twitch/LoginToken.h @@ -61,6 +61,10 @@ public: return access_token; } + const std::string &GetClientId() const { + return client_id; + } + PromiseType &Refresh(ws::Context &ws); private: diff --git a/src/ws/Connection.cpp b/src/ws/Connection.cpp index 82e9c02..07ec0a2 100644 --- a/src/ws/Connection.cpp +++ b/src/ws/Connection.cpp @@ -90,7 +90,9 @@ int HttpsConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_t } break; case LWS_CALLBACK_WSI_CREATE: + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: case LWS_CALLBACK_CLOSED_CLIENT_HTTP: break; @@ -149,9 +151,18 @@ int PusherConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ break; case LWS_CALLBACK_CLIENT_WRITEABLE: if (out_buffer.length() > LWS_PRE) { - int res = lws_write(wsi, reinterpret_cast(&out_buffer[LWS_PRE]), out_buffer.length() - LWS_PRE, LWS_WRITE_TEXT); + size_t pos = out_buffer.find('\0', LWS_PRE); + size_t len = pos == std::string::npos ? out_buffer.length() : pos; + int res = lws_write(wsi, reinterpret_cast(&out_buffer[LWS_PRE]), len - LWS_PRE, LWS_WRITE_TEXT); if (res > 0) { - out_buffer.erase(LWS_PRE, res); + if (res == len - LWS_PRE && pos != std::string::npos) { + out_buffer.erase(LWS_PRE, res + 1); + } else { + out_buffer.erase(LWS_PRE, res); + } + } + if (out_buffer.length() > LWS_PRE) { + lws_callback_on_writable(wsi); } } break; @@ -197,12 +208,7 @@ TwitchConnection::TwitchConnection(Context &ctx) info.ietf_version_or_minus_one = -1; info.userdata = &ctx; info.pwsi = &wsi; - wsi = lws_client_connect_via_info(&info); - if (!wsi) { - throw std::runtime_error("failed to connect client"); - } - lws_set_timer_usecs(wsi, 30000000); - out_buffer.insert(0, LWS_PRE, '\0'); + Connect(); token.Load(); } @@ -210,6 +216,7 @@ int TwitchConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ switch (reason) { case LWS_CALLBACK_CLIENT_ESTABLISHED: connected = true; + std::cout << "twitch connection established" << std::endl; OnConnect(); if (out_buffer.length() > LWS_PRE) { lws_callback_on_writable(wsi); @@ -217,7 +224,9 @@ int TwitchConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ break; case LWS_CALLBACK_CLIENT_CLOSED: connected = false; + authenticated = false; std::cout << "twitch connection closed" << std::endl; + Connect(); break; case LWS_CALLBACK_CLIENT_RECEIVE: if (lws_is_first_fragment(wsi)) { @@ -270,4 +279,11 @@ int TwitchConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ return 0; } +HttpsConnection &TwitchConnection::AuthorizedRequest(const char *method, const char *host, const char *path) { + HttpsConnection &req = ctx.HttpsRequest(method, host, path); + req.SetHeader("Authorization", "Bearer " + token.GetAccessToken()); + req.SetHeader("Client-Id", token.GetClientId()); + return req; +} + } diff --git a/src/ws/HttpsConnection.h b/src/ws/HttpsConnection.h index bf07a4e..c286d2d 100644 --- a/src/ws/HttpsConnection.h +++ b/src/ws/HttpsConnection.h @@ -25,15 +25,6 @@ public: HttpsConnection(const HttpsConnection &) = delete; HttpsConnection &operator =(const HttpsConnection &) = delete; -private: - struct Callback { - void *user; - void (*callback)(void *, HttpsConnection &); - void Call(HttpsConnection &val) const { - (*callback)(user, val); - } - }; - public: void SetHeader(const std::string &name, const std::string &value) { headers[name + ":"] = value; diff --git a/src/ws/PusherConnection.h b/src/ws/PusherConnection.h index dfbc1f4..b89a28f 100644 --- a/src/ws/PusherConnection.h +++ b/src/ws/PusherConnection.h @@ -4,17 +4,21 @@ #include #include #include -#include #include #include +#include "../sys/Promise.h" + namespace ws { class Context; class PusherConnection { +public: + typedef sys::Promise PromiseType; + public: explicit PusherConnection(Context &ctx); ~PusherConnection() { @@ -23,26 +27,22 @@ public: PusherConnection(const PusherConnection &) = delete; PusherConnection &operator =(const PusherConnection &) = delete; -private: - struct Callback { - void *user; - void (*callback)(void *, const Json::Value &); - void Call(const Json::Value &val) const { - (*callback)(user, val); - } - }; - public: void Ping() { SendMessage("{\"event\":\"pusher:ping\"}"); } - void Subscribe(const std::string &chan, void (*callback)(void *, const Json::Value &), void *user = nullptr) { - callbacks[chan].push_back({ user, callback }); - Json::Value json; - json["event"] = "pusher:subscribe"; - json["data"]["channel"] = chan; - SendMessage(json); + PromiseType &Subscribe(const std::string &chan) { + auto it = callbacks.find(chan); + if (it != callbacks.end()) { + return it->second; + } else { + Json::Value json; + json["event"] = "pusher:subscribe"; + json["data"]["channel"] = chan; + SendMessage(json); + return callbacks[chan]; + } } void SendMessage(const Json::Value &json) { @@ -50,11 +50,17 @@ public: } void SendMessage(const std::string &msg) { + if (out_buffer.length() > LWS_PRE) { + out_buffer.push_back('\0'); + } out_buffer.append(msg); lws_callback_on_writable(wsi); } void SendMessage(const char *msg) { + if (out_buffer.length() > LWS_PRE) { + out_buffer.push_back('\0'); + } out_buffer.append(msg); lws_callback_on_writable(wsi); } @@ -66,9 +72,7 @@ public: Json::Value json; json_reader.parse(msg, json); const std::string channel = json["channel"].asString(); - for (const Callback &callback : callbacks[channel]) { - callback.Call(json); - } + callbacks[channel].Resolve(json); } private: @@ -81,7 +85,7 @@ private: Json::Reader json_reader; Json::FastWriter json_writer; - std::map> callbacks; + std::map callbacks; }; diff --git a/src/ws/TwitchConnection.h b/src/ws/TwitchConnection.h index 7e67e34..b37a0c9 100644 --- a/src/ws/TwitchConnection.h +++ b/src/ws/TwitchConnection.h @@ -12,6 +12,8 @@ #include "../twitch/IRCMessage.h" #include "../twitch/LoginToken.h" +#include "../sys/Promise.h" +#include "HttpsConnection.h" namespace ws { @@ -19,6 +21,9 @@ class Context; class TwitchConnection { +public: + typedef sys::Promise PromiseType; + public: explicit TwitchConnection(Context &ctx); ~TwitchConnection() { @@ -27,15 +32,6 @@ public: TwitchConnection(const TwitchConnection &) = delete; TwitchConnection &operator =(const TwitchConnection &) = delete; -private: - struct Callback { - void *user; - void (*callback)(void *, const twitch::IRCMessage &); - void Call(const twitch::IRCMessage &val) const { - (*callback)(user, val); - } - }; - public: void OnConnect() { SendMessage("CAP REQ :twitch.tv/tags twitch.tv/commands"); @@ -61,10 +57,15 @@ public: SendMessage("NICK HorstieBot"); } - void Join(const std::string &chan, void (*callback)(void *, const twitch::IRCMessage &), void *user = nullptr) { - callbacks[chan].push_back({ user, callback }); - if (authenticated && callbacks[chan].size() == 1) { - SendMessage("JOIN " + chan); + PromiseType &Join(const std::string &chan) { + auto it = callbacks.find(chan); + if (it != callbacks.end()) { + return it->second; + } else { + if (authenticated) { + SendMessage("JOIN " + chan); + } + return callbacks[chan]; } } @@ -86,6 +87,8 @@ public: lws_callback_on_writable(wsi); } + HttpsConnection &AuthorizedRequest(const char *method, const char *host, const char *path); + public: int ProtoCallback(lws_callback_reasons reason, void *in, size_t len); @@ -132,10 +135,20 @@ public: if (msg.params.empty()) return; auto it = callbacks.find(msg.params[0]); if (it != callbacks.end()) { - for (const Callback &callback : it->second) { - callback.Call(msg); - } + it->second.Resolve(msg); + } + } + +private: + void Connect() { + wsi = lws_client_connect_via_info(&info); + if (!wsi) { + throw std::runtime_error("failed to connect client"); } + lws_set_timer_usecs(wsi, 30000000); + in_buffer.clear(); + out_buffer.clear(); + out_buffer.insert(0, LWS_PRE, '\0'); } private: @@ -148,7 +161,7 @@ private: std::string in_buffer; std::string out_buffer; - std::map> callbacks; + std::map callbacks; twitch::LoginToken token; twitch::IRCMessage in_msg;