From fe1c82c34dc1cb6448706e5e628043514cd73d43 Mon Sep 17 00:00:00 2001 From: Daniel Karbach Date: Wed, 23 Oct 2024 20:40:29 +0200 Subject: [PATCH] temp clips on shoutout --- Makefile | 4 +- src/app/Application.h | 74 ++++++++++++++-------- src/app/AudioReceiver.h | 12 ++-- src/app/ChannelInfo.h | 3 + src/app/Media.h | 2 +- src/app/Shoutout.cpp | 31 ++++++++++ src/app/Shoutout.h | 36 +++++++++-- src/app/Source.h | 4 ++ src/app/State.h | 25 +++++++- src/app/Stream.h | 2 +- src/app/VideoReceiver.h | 4 ++ src/ffmpeg/Decoder.h | 1 + src/ffmpeg/Frame.h | 14 ++++- src/ffmpeg/Resampler.h | 30 +++++++-- src/gfx/Rectangle.h | 5 ++ src/main.cpp | 5 ++ src/sys/Callbacks.h | 49 +++++++++++++++ src/sys/Promise.h | 106 ++++++++++++++++++++++++-------- src/twitch/Clip.h | 47 ++++++++++++++ src/twitch/IRCMessage.cpp | 2 + src/twitch/IRCMessage.h | 40 +++++++++++- src/twitch/LoginToken.cpp | 12 ++-- src/twitch/LoginToken.h | 4 +- src/ws/Connection.cpp | 125 +++++++++++++++++++++++--------------- src/ws/Context.h | 102 ++++++++++++++++++------------- src/ws/HttpsConnection.h | 20 +++++- src/ws/PusherConnection.h | 10 +-- src/ws/TwitchConnection.h | 39 +++++++++--- 28 files changed, 617 insertions(+), 191 deletions(-) create mode 100644 src/app/Shoutout.cpp create mode 100644 src/sys/Callbacks.h create mode 100644 src/twitch/Clip.h diff --git a/Makefile b/Makefile index 9b13c19..a82d071 100644 --- a/Makefile +++ b/Makefile @@ -1,10 +1,10 @@ CPP_SRCS = $(shell find src -name \*.cpp) CPP_DEPS = $(shell find src -name \*.h) -LIBS = cairo freetype2 jsoncpp libavformat libavcodec libavutil libswresample libswscale libuv libwebsockets pangocairo +LIBS = cairo freetype2 icu-uc jsoncpp libavformat libavcodec libavutil libswresample libswscale libuv libwebsockets pangocairo main: $(CPP_SRCS) $(CPP_DEPS) - clang++ -g $(shell pkg-config --cflags --libs $(LIBS)) $(CPP_SRCS) -o $@ + clang++ -gdwarf-4 $(shell pkg-config --cflags --libs $(LIBS)) $(CPP_SRCS) -o $@ compile_flags.txt: echo -xc++ > $@ diff --git a/src/app/Application.h b/src/app/Application.h index 6ac687e..7629e59 100644 --- a/src/app/Application.h +++ b/src/app/Application.h @@ -34,6 +34,8 @@ public: , renderer(stream.GetVideoPlane(), stream.GetVideoLineSize(), width, height) , state(width, height) , own_channel_id("1020523186") + , enable_realtime(false) + , enable_shoutouts(false) , drawing_game(renderer.GetContext(), 45, 50, { 1, 1, 1 }) { state.SetGame(&drawing_game); } @@ -44,33 +46,41 @@ public: Application &operator =(const Application &) = delete; public: + void EnableRealtime() { + enable_realtime = true; + } + + void EnableShoutouts() { + enable_shoutouts = true; + } + void Start() { std::cout << "starting services" << std::endl; - pusher_conn.Subscribe("Channel").Then([this](const Json::Value &json) -> void { + pusher_conn.Subscribe("Channel").Listen([this](const Json::Value &json) -> void { HandlePusherChannel(json); }); - pusher_conn.Subscribe("ChatBotLog").Then([this](const Json::Value &json) -> void { + pusher_conn.Subscribe("ChatBotLog").Listen([this](const Json::Value &json) -> void { HandlePusherChatBotLog(json); }); - twitch_conn.Join("#horstiebot").Then([this](const twitch::IRCMessage &msg) -> void { + twitch_conn.Join("#horstiebot").Listen([this](const twitch::IRCMessage &msg) -> void { HandleTwitch(msg); }); - ws_ctx.HttpsRequest("GET", "alttp.localhorst.tv", "/api/channels?chatting=1").GetPromise().Then([this](ws::HttpsConnection &rsp) -> void { - InitChannels(rsp); + ws_ctx.HttpsRequest("GET", "alttp.localhorst.tv", "/api/channels?chatting=1").GetPromise().Then([this](ws::HttpsConnection *rsp) -> void { + InitChannels(*rsp); }); stream.Start(); - //Media &media = state.AddMedia("test.mp4"); - //Clock sync_point = stream.GetVideoClock(); - //sync_point.Advance(600); - //media.SetSyncPoint(sync_point); - //media.AddWindow({ 0, 0, 1920, 1080 }, { 600, 50, 640, 360 }); - - //Media &media = state.AddMedia("test.mkv"); - //Clock sync_point = stream.GetVideoClock(); - //sync_point.Advance(600); - //media.SetSyncPoint(sync_point); - //media.AddWindow({ 0, 0, 1280, 720 }, { 600, 50, 640, 360 }); + //Media &media_a = state.AddMedia("test.mp4"); + //Clock sync_point_a = stream.GetVideoClock(); + //sync_point_a.Advance(1200); + //media_a.SetSyncPoint(sync_point_a); + //media_a.AddWindow({ 0, 0, 1, 1 }, { 100, 250, 640, 360 }); + + //Media &media_b = state.AddMedia("test.mkv"); + //Clock sync_point_b = stream.GetVideoClock(); + //sync_point_b.Advance(600); + //media_b.SetSyncPoint(sync_point_b); + //media_b.AddWindow({ 0, 0, 1, 1 }, { 600, 50, 640, 360 }); } void Step() { @@ -83,7 +93,7 @@ public: stream.PrepareVideoFrame(); state.PullVideo(stream.GetVideoClock()); - if (target > 0 && difference < 0) { + if (enable_realtime && target > 0 && difference < 0) { std::cout << (difference / -1000.0) << "s behind schedule, dropping frame" << std::endl; } else { state.Update(renderer.GetContext(), stream.GetVideoClock()); @@ -101,12 +111,13 @@ public: state.Clean(); - if (difference > 1000) { + if (enable_realtime && difference > 1000) { std::this_thread::sleep_for(std::chrono::milliseconds(difference - 1000)); } } void Stop() { + twitch_conn.SetClosed(); ws_ctx.Shutdown(); stream.Finish(); } @@ -121,6 +132,7 @@ private: ChannelInfo &info = state.GetChannelInfo(channel_id); info.Update(channel); } + ShoutoutChannel(33); } void HandlePusherChannel(const Json::Value &json) { @@ -136,7 +148,7 @@ private: void UpdateChannel(const Json::Value &json) { int channel_id = json["id"].asInt(); - bool is_live =json["twitch_live"].asBool(); + bool is_live = json["twitch_live"].asBool(); bool is_known = state.IsChannelKnown(channel_id); ChannelInfo &channel = state.GetChannelInfo(channel_id); bool went_live = !channel.twitch_live && is_live; @@ -145,7 +157,9 @@ private: if (went_live) { // channel went live std::cout << "channel " << channel.title << " went live" << std::endl; - ShoutoutChannel(channel_id); + if (channel.chat) { + ShoutoutChannel(channel_id); + } } else if (went_down) { // channel went down std::cout << "channel " << channel.title << " went down" << std::endl; @@ -156,12 +170,10 @@ private: ChannelInfo &channel = state.GetChannelInfo(id); Shoutout &shout = renderer.CreateShoutout(id, state); if (!channel.twitch_id.empty() && channel.twitch_id != own_channel_id) { - ws::HttpsConnection &req = twitch_conn.AuthorizedRequest("POST", "api.twitch.tv", "/helix/chat/shoutouts"); - req.SetHeader("Content-Type", "application/x-www-form-urlencoded"); - req.AddFormUrlenc("from_broadcaster_id", own_channel_id); - req.AddFormUrlenc("to_broadcaster_id", channel.twitch_id); - req.AddFormUrlenc("moderator_id", own_channel_id); - req.SetContentLength(); + if (enable_shoutouts) { + twitch_conn.Shoutout(own_channel_id, channel.twitch_id); + } + shout.FetchClip(twitch_conn); } } @@ -187,6 +199,14 @@ private: } void HandleTwitch(const twitch::IRCMessage &msg) { + if (msg.StartsWith("!so ") && msg.IsMod()) { + std::string name = msg.GetText().length() > 4 && msg.GetText()[4] == '@' + ? msg.GetText().substr(5) : msg.GetText().substr(4); + const ChannelInfo *channel = state.FindChannelInfo(name); + if (channel) { + ShoutoutChannel(channel->id); + } + } if (state.HasGame()) { state.GetGame().Handle(msg); } @@ -203,6 +223,8 @@ private: Renderer renderer; State state; std::string own_channel_id; + bool enable_realtime; + bool enable_shoutouts; DrawingGame drawing_game; diff --git a/src/app/AudioReceiver.h b/src/app/AudioReceiver.h index 50bb340..91139b8 100644 --- a/src/app/AudioReceiver.h +++ b/src/app/AudioReceiver.h @@ -32,18 +32,14 @@ public: encoder.SetSampleRate(48000); encoder.SetSampleFormat(AV_SAMPLE_FMT_FLT); encoder.Open(); - resampler.SetOpt("in_channel_count", decoder.GetChannelLayout().nb_channels); + resampler.SetOpt("in_channel_layout", decoder.GetChannelLayout()); resampler.SetOpt("in_sample_rate", decoder.GetSampleRate()); resampler.SetOpt("in_sample_fmt", decoder.GetSampleFormat()); - resampler.SetOpt("out_channel_count", encoder.GetChannelLayout().nb_channels); + resampler.SetOpt("out_channel_layout", encoder.GetChannelLayout()); resampler.SetOpt("out_sample_rate", encoder.GetSampleRate()); resampler.SetOpt("out_sample_fmt", encoder.GetSampleFormat()); resampler.Init(); - if (encoder.GetFrameSize() > 0) { - output_frame.AllocateAudio(encoder.GetFrameSize(), encoder.GetSampleFormat(), encoder.GetChannelLayout()); - } else { - output_frame.AllocateAudio(decoder.GetFrameSize(), encoder.GetSampleFormat(), encoder.GetChannelLayout()); - } + output_frame.AllocateAudio(resampler.GetOutSamples(decoder.GetFrameSize()), encoder.GetSampleFormat(), encoder.GetChannelLayout()); clock = Clock(encoder.GetTimeBase()); } ~AudioReceiver() { @@ -91,7 +87,7 @@ public: bool res = decoder.ReceiveFrame(input_frame); if (res) { ready = true; - int converted = resampler.Convert(encoder, input_frame, output_frame); + int converted = resampler.Convert(decoder, input_frame, encoder, output_frame); Buffer(converted); clock.Advance(converted); } diff --git a/src/app/ChannelInfo.h b/src/app/ChannelInfo.h index d002464..e41722a 100644 --- a/src/app/ChannelInfo.h +++ b/src/app/ChannelInfo.h @@ -16,6 +16,7 @@ struct ChannelInfo { : id(json["id"].asInt()) , title(json["title"].asString()) , twitch_id(json["twitch_id"].asString()) + , twitch_chat(json["twitch_chat"].asString()) , twitch_title(json["twitch_title"].asString()) , twitch_category(json["twitch_category_name"].asString()) , chat(json["chat"].asBool()) @@ -29,6 +30,7 @@ struct ChannelInfo { } title = json["title"].asString(); twitch_id = json["twitch_id"].asString(); + twitch_chat = json["twitch_chat"].asString(); twitch_title = json["twitch_title"].asString(); twitch_category = json["twitch_category_name"].asString(); chat = json["chat"].asBool(); @@ -39,6 +41,7 @@ struct ChannelInfo { int id; std::string title; std::string twitch_id; + std::string twitch_chat; std::string twitch_title; std::string twitch_category; bool chat; diff --git a/src/app/Media.h b/src/app/Media.h index d875d3a..7ecec17 100644 --- a/src/app/Media.h +++ b/src/app/Media.h @@ -31,7 +31,7 @@ public: } void AddWindow(const gfx::Rectangle &src, const gfx::Rectangle &dst) { - windows.push_back({ src, dst }); + windows.push_back({ src.ScaleTo(source.GetVideoSize()), dst }); } void PullAudio(const Clock &clock, int frame_size) { diff --git a/src/app/Shoutout.cpp b/src/app/Shoutout.cpp new file mode 100644 index 0000000..7c14b2d --- /dev/null +++ b/src/app/Shoutout.cpp @@ -0,0 +1,31 @@ +#include "Shoutout.h" + +#include + +#include "State.h" + +namespace app { + +void Shoutout::LoadRandomClip(const Json::Value &json) { + const Json::Value &data = json["data"]; + if (!data.isArray()) return; + if (data.empty()) return; + Json::ArrayIndex num = data.size(); + std::uniform_int_distribution dist(0, num - 1); + Json::ArrayIndex choice = dist(state.GetRNG()); + const Json::Value &clip_json = data[choice]; + clip = twitch::Clip(clip_json); +} + +void Shoutout::Start(const Clock &clock) { + start_time = clock; + running = true; + if (clip.HasVideo()) { + std::cout << "adding clip " << clip.GetVideoURL() << " at " << clock << std::endl; + Media &media = state.AddMedia(clip.GetVideoURL().c_str()); + media.SetSyncPoint(clock); + media.AddWindow({ 0, 0, 1, 1 }, { 320, 150, 640, 360 }); + } +} + +} diff --git a/src/app/Shoutout.h b/src/app/Shoutout.h index c9dcc51..0da797e 100644 --- a/src/app/Shoutout.h +++ b/src/app/Shoutout.h @@ -7,15 +7,22 @@ #include "../gfx/ColorRGB.h" #include "../gfx/Position.h" #include "../gfx/Spacing.h" +#include "../twitch/Clip.h" +#include "../ws/TwitchConnection.h" +#include "json/forwards.h" +#include "json/value.h" #include namespace app { +class State; + class Shoutout { public: - Shoutout(const ChannelInfo &channel, cairo::Context &ctx) + Shoutout(const ChannelInfo &channel, cairo::Context &ctx, State &state) : channel(channel) + , state(state) , title_layout(ctx.CreateLayout()) , channel_layout(ctx.CreateLayout()) , category_layout(ctx.CreateLayout()) @@ -28,7 +35,8 @@ public: , category_color{ 0.6, 0.6, 0.6 } , start_time() , running(false) - , done(false) { + , done(false) + , fetching_clip(false) { title_layout.SetText(channel.twitch_title); channel_layout.SetText(channel.title); category_layout.SetText(channel.twitch_category); @@ -51,7 +59,7 @@ public: public: bool Loading() const { - return false; + return fetching_clip; } bool Running() const { @@ -74,11 +82,23 @@ public: category_layout.SetFont(font); } - void Start(const Clock &clock) { - start_time = clock; - running = true; + void FetchClip(ws::TwitchConnection &twitch) { + fetching_clip = true; + twitch.FetchClips(channel.twitch_id) + .Then([this](const Json::Value *rsp) -> void { + fetching_clip = false; + LoadRandomClip(*rsp); + }) + .Catch([this](ws::HttpsConnection *rsp) -> void { + fetching_clip = false; + std::cout << "failed to fetch clips" << std::endl; + }); } + void LoadRandomClip(const Json::Value &json); + + void Start(const Clock &clock); + void Update(cairo::Context &ctx, const Clock &clock) { const Clock runtime = clock.Difference(start_time); int64_t ms = runtime.GetMS(); @@ -130,6 +150,7 @@ public: private: const ChannelInfo &channel; + State &state; pango::Layout title_layout; pango::Layout channel_layout; @@ -143,9 +164,12 @@ private: gfx::Size size; gfx::Spacing padding; + twitch::Clip clip; + Clock start_time; bool running; bool done; + bool fetching_clip; }; diff --git a/src/app/Source.h b/src/app/Source.h index 8893798..072a29b 100644 --- a/src/app/Source.h +++ b/src/app/Source.h @@ -63,6 +63,10 @@ public: return audio.IsEOF() && video.IsEOF(); } + gfx::Size GetVideoSize() const { + return video.GetSize(); + } + cairo::Surface GetVideoSurface() { return video.GetSurface(); } diff --git a/src/app/State.h b/src/app/State.h index b532aec..a072662 100644 --- a/src/app/State.h +++ b/src/app/State.h @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include "ChannelInfo.h" #include "Clock.h" @@ -20,7 +22,7 @@ class State { public: State(int width, int height) - : width(width), height(height), game(nullptr) { + : width(width), height(height), game(nullptr), rnd_gen(rnd_dev()) { } public: @@ -59,6 +61,18 @@ public: return info; } + ChannelInfo *FindChannelInfo(const std::string &str) { + for (auto &c : channels) { + if (strcasecmp(c.second.title.c_str(), str.c_str()) == 0) { + return &c.second; + } + if (c.second.twitch_chat.length() > 1 && strcasecmp(c.second.twitch_chat.c_str() + 1, str.c_str()) == 0) { + return &c.second; + } + } + return nullptr; + } + const std::list &GetMedia() const { return media; } @@ -82,7 +96,7 @@ public: } Shoutout &AddShoutout(int channel_id, cairo::Context &ctx) { - shoutouts.emplace_back(GetChannelInfo(channel_id), ctx); + shoutouts.emplace_back(GetChannelInfo(channel_id), ctx, *this); return shoutouts.back(); } @@ -94,6 +108,10 @@ public: return height; } + std::mt19937 &GetRNG() { + return rnd_gen; + } + void PullAudio(const Clock &clock, int frame_size) { for (Media &m : media) { m.PullAudio(clock, frame_size); @@ -155,6 +173,9 @@ private: std::list msgs; std::list shoutouts; + std::random_device rnd_dev; + std::mt19937 rnd_gen; + }; } diff --git a/src/app/Stream.h b/src/app/Stream.h index c595808..3670643 100644 --- a/src/app/Stream.h +++ b/src/app/Stream.h @@ -127,7 +127,7 @@ public: } void PushAudioFrame() { - resampler.Convert(audio_encoder, audio_input_frame, audio_output_frame); + resampler.Convert(audio_encoder, audio_input_frame, audio_encoder, audio_output_frame); audio_output_frame.SetPresentationTimestamp(audio_clock.GetCounter()); audio_encoder.SendFrame(audio_output_frame); while (audio_encoder.ReceivePacket(audio_packet)) { diff --git a/src/app/VideoReceiver.h b/src/app/VideoReceiver.h index 21fc61d..1ae2016 100644 --- a/src/app/VideoReceiver.h +++ b/src/app/VideoReceiver.h @@ -53,6 +53,10 @@ public: return ready; } + gfx::Size GetSize() const { + return gfx::Size{ double(decoder.GetWidth()), double(decoder.GetHeight()) }; + } + cairo::Surface GetSurface() { return cairo::Surface( output_frame.GetDataPlane(0), output_frame.GetPlaneLinesize(0), CAIRO_FORMAT_ARGB32, diff --git a/src/ffmpeg/Decoder.h b/src/ffmpeg/Decoder.h index 66ebeb4..142abd7 100644 --- a/src/ffmpeg/Decoder.h +++ b/src/ffmpeg/Decoder.h @@ -2,6 +2,7 @@ #define TEST_FFMPEG_DECODER_H_ #include +#include extern "C" { #include diff --git a/src/ffmpeg/Frame.h b/src/ffmpeg/Frame.h index ed07c60..2d20ac8 100644 --- a/src/ffmpeg/Frame.h +++ b/src/ffmpeg/Frame.h @@ -71,6 +71,10 @@ public: } } + const AVChannelLayout &GetChannelLayout() const { + return frame->ch_layout; + } + uint8_t **GetData() { return frame->data; } @@ -123,7 +127,15 @@ public: return frame->nb_samples; } - void SetChannelLayout(AVChannelLayout layout) { + AVSampleFormat GetSampleFormat() const { + return static_cast(frame->format); + } + + int GetSampleRate() const { + return frame->sample_rate; + } + + void SetChannelLayout(const AVChannelLayout &layout) { frame->ch_layout = layout; } diff --git a/src/ffmpeg/Resampler.h b/src/ffmpeg/Resampler.h index ce378c2..e6caf3c 100644 --- a/src/ffmpeg/Resampler.h +++ b/src/ffmpeg/Resampler.h @@ -3,9 +3,12 @@ #include "CodecContext.h" #include +#include +#include #include extern "C" { +#include #include #include #include @@ -46,10 +49,14 @@ public: Resampler &operator =(const Resampler &) = delete; public: - int Convert(const CodecContext &codec, const Frame &src, Frame &dst) { - int64_t from = swr_get_delay(ctx, codec.GetSampleRate()) + src.GetSamples(); - int64_t nb_samples = av_rescale_rnd(from, codec.GetSampleRate(), codec.GetSampleRate(), AV_ROUND_UP); - int res = swr_convert(ctx, dst.GetData(), nb_samples, src.GetData(), src.GetSamples()); + int Convert(const CodecContext &src_codec, const Frame &src, const CodecContext &dst_codec, Frame &dst) { + //int64_t from = swr_get_delay(ctx, src_codec.GetSampleRate()) + src.GetSamples(); + //int64_t nb_samples = av_rescale_rnd(from, dst_codec.GetSampleRate(), src_codec.GetSampleRate(), AV_ROUND_UP); + //dst.EnsureAudioAllocated(nb_samples); + int res = swr_convert(ctx, dst.GetData(), dst.GetSamples(), src.GetData(), src.GetSamples()); + if (res > dst.GetSamples()) { + std::cout << "ERROR: wrote more samples than space available!" << std::endl; + } if (res < 0) { throw Error("failed to resample", res); } @@ -63,6 +70,14 @@ public: } } + int GetOutSamples(int in_samples) const { + int out_samples = swr_get_out_samples(ctx, in_samples); + if (out_samples < 0) { + throw Error("unable to calculate output samples", out_samples); + } + return out_samples; + } + void SetOpt(const char *name, int value) { int res = av_opt_set_int(ctx, name, value, 0); if (res != 0) { @@ -77,6 +92,13 @@ public: } } + void SetOpt(const char *name, const AVChannelLayout &value) { + int res = av_opt_set_chlayout(ctx, name, &value, 0); + if (res != 0) { + throw Error("failed to set option", res); + } + } + private: SwrContext *ctx; diff --git a/src/gfx/Rectangle.h b/src/gfx/Rectangle.h index bd14cb5..31fa730 100644 --- a/src/gfx/Rectangle.h +++ b/src/gfx/Rectangle.h @@ -2,6 +2,7 @@ #define TEST_GFX_RECTANGLE_H_ #include "Position.h" +#include "Size.h" namespace gfx { @@ -16,6 +17,10 @@ struct Rectangle { return gfx::Position{x, y}; } + Rectangle ScaleTo(const Size &size) const { + return Rectangle{ x * size.w, y * size.h, w * size.w, h * size.h }; + } + }; } diff --git a/src/main.cpp b/src/main.cpp index b652215..92b7370 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,4 +1,5 @@ #include +#include #include #include "app/Application.h" @@ -25,6 +26,10 @@ int main(int argc, char**argv) { const char *URL = argc > 1 ? argv[1] : "rtmp://localhost/localhorsttv"; app::Application app(WIDTH, HEIGHT, FPS, URL); + app.EnableRealtime(); + if (std::strcmp(URL, "rtmp://localhost/horstiebot") == 0) { + app.EnableShoutouts(); + } signal(SIGINT, stop); diff --git a/src/sys/Callbacks.h b/src/sys/Callbacks.h new file mode 100644 index 0000000..a6db061 --- /dev/null +++ b/src/sys/Callbacks.h @@ -0,0 +1,49 @@ +#ifndef TEST_SYS_CALLBACKS_H_ +#define TEST_SYS_CALLBACKS_H_ + +#include +#include +#include +#include + +namespace sys { + +template +class Callbacks { + +public: + typedef std::function Callback; + +public: + Callbacks &Listen(Callback callback) { + callbacks.push_back(callback); + return *this; + } + +public: + void Fire(Args... args) { + for (Callback &callback : callbacks) { + callback(args...); + } + } + + void FireNothrow(Args... args) { + for (Callback &callback : callbacks) { + try { + callback(args...); + } catch (const std::exception &e) { + std::cerr << "exception in nothrow callback: " << e.what() << std::endl; + } catch (...) { + std::cerr << "exception in nothrow callback" << std::endl; + } + } + } + +private: + std::vector callbacks; + +}; + +} + +#endif diff --git a/src/sys/Promise.h b/src/sys/Promise.h index e20c76c..dfd733e 100644 --- a/src/sys/Promise.h +++ b/src/sys/Promise.h @@ -4,55 +4,109 @@ #include #include #include +#include +#include #include namespace sys { -template +template class Promise { + static_assert(!std::is_reference::value, "promise result type must not be reference"); + static_assert(!std::is_reference::value, "promise error type must not be reference"); + public: - typedef std::function Callback; + typedef std::function ResultCallback; + typedef std::function ErrorCallback; + typedef Promise SelfType; public: - Promise &Then(Callback callback) { - success.push_back(callback); + Promise(): data(std::make_shared()) { + } + +public: + SelfType &Then(ResultCallback callback) { + if (data->status == PENDING) { + data->on_success.push_back(callback); + } + if (data->status == RESOLVED) { + RunResultCallback(callback); + } return *this; } - Promise &Catch(Callback callback) { - error.push_back(callback); + SelfType &Catch(ErrorCallback callback) { + if (data->status == PENDING) { + data->on_error.push_back(callback); + } + if (data->status == REJECTED) { + RunErrorCallback(callback); + } return *this; } public: - void Resolve(Args... args) { - for (Callback &callback : success) { - try { - callback(args...); - } catch (const std::exception &e) { - std::cerr << "exception in promise resolution: " << e.what() << std::endl; - } catch (...) { - std::cerr << "exception in promise resolution" << std::endl; - } + void Resolve(ResultType result) { + if (data->status != PENDING) { + std::cout << "resolution of completed promise" << std::endl; + return; + } + data->result = result; + data->status = RESOLVED; + for (ResultCallback &callback : data->on_success) { + RunResultCallback(callback); } + data->on_success.clear(); + data->on_error.clear(); } - void Reject(Args... args) { - for (Callback &callback : error) { - try { - callback(args...); - } catch (const std::exception &e) { - std::cerr << "exception in promise rejection: " << e.what() << std::endl; - } catch (...) { - std::cerr << "exception in promise rejection" << std::endl; - } + void Reject(ErrorType error) { + if (data->status != PENDING) { + std::cout << "rejection of completed promise" << std::endl; + return; + } + data->error = error; + data->status = REJECTED; + for (ErrorCallback &callback : data->on_error) { + RunErrorCallback(callback); + } + data->on_success.clear(); + data->on_error.clear(); + } + +private: + void RunResultCallback(ResultCallback &callback) { + try { + callback(data->result); + } catch (const std::exception &e) { + std::cerr << "exception in promise resolution: " << e.what() << std::endl; + } catch (...) { + std::cerr << "exception in promise resolution" << std::endl; } } + void RunErrorCallback(ErrorCallback &callback) { + try { + callback(data->error); + } catch (const std::exception &e) { + std::cerr << "exception in promise rejection: " << e.what() << std::endl; + } catch (...) { + std::cerr << "exception in promise rejection" << std::endl; + } + } +private: + enum Status { PENDING, RESOLVED, REJECTED }; + struct Data { + std::vector on_success; + std::vector on_error; + ResultType result; + ErrorType error; + Status status = PENDING; + }; + private: - std::vector success; - std::vector error; + std::shared_ptr data; }; diff --git a/src/twitch/Clip.h b/src/twitch/Clip.h new file mode 100644 index 0000000..e9d7172 --- /dev/null +++ b/src/twitch/Clip.h @@ -0,0 +1,47 @@ +#ifndef TEST_TWITCH_CLIP_H_ +#define TEST_TWITCH_CLIP_H_ + +#include +#include + +namespace twitch { + +class Clip { + +public: + Clip() { + } + explicit Clip(const Json::Value &json) + : broadcaster_name(json["broadcaster_name"].asString()) + , creator_name(json["creator_name"].asString()) + , thumbnail_url(json["thumbnail_url"].asString()) + , title(json["title"].asString()) { + std::cout << "clip: " << json << std::endl; + size_t thumb_pos = thumbnail_url.find("-preview-"); + if (thumb_pos != std::string::npos) { + video_url = thumbnail_url.substr(0, thumb_pos); + video_url += ".mp4"; + } + } + +public: + bool HasVideo() const { + return !video_url.empty(); + } + + const std::string &GetVideoURL() const { + return video_url; + } + +private: + std::string broadcaster_name; + std::string creator_name; + std::string thumbnail_url; + std::string title; + std::string video_url; + +}; + +} + +#endif diff --git a/src/twitch/IRCMessage.cpp b/src/twitch/IRCMessage.cpp index 90ea381..f2b148d 100644 --- a/src/twitch/IRCMessage.cpp +++ b/src/twitch/IRCMessage.cpp @@ -3,6 +3,8 @@ namespace twitch { +std::string IRCMessage::THE_EMPTY_STRING; + void IRCMessage::Decode(std::string::const_iterator begin, std::string::const_iterator input_end) { command.clear(); params.clear(); diff --git a/src/twitch/IRCMessage.h b/src/twitch/IRCMessage.h index db25ae5..cbcc929 100644 --- a/src/twitch/IRCMessage.h +++ b/src/twitch/IRCMessage.h @@ -21,8 +21,21 @@ public: void Decode(std::string::const_iterator begin, std::string::const_iterator end); void Encode(std::string &out) const; - std::string GetText() const { - return params.empty() ? "" : params.back(); + const std::string &GetTag(const std::string &name) const { + auto it = tags.find(name); + return it != tags.end() ? it->second : THE_EMPTY_STRING; + } + + const std::string &GetTarget() const { + return params.empty() ? THE_EMPTY_STRING : params.front(); + } + + const std::string &GetText() const { + return params.empty() ? THE_EMPTY_STRING : params.back(); + } + + bool HasTag(const std::string &name) const { + return tags.find(name) != tags.end(); } bool IsLoginSuccess() const { @@ -41,6 +54,26 @@ public: return command == "PRIVMSG"; } + bool IsOwner() const { + return GetTarget().substr(1) == nick; + } + + bool IsMod() const { + return IsOwner() || GetTag("mod") == "1"; + } + + bool StartsWith(char c) const { + return !GetText().empty() && GetText()[0] == c; + } + + bool StartsWith(const char *str) const { + return GetText().rfind(str, 0) == 0; + } + + bool StartsWith(const std::string &str) const { + return GetText().rfind(str, 0) == 0; + } + IRCMessage MakePong() const { IRCMessage pong; pong.command = "PONG"; @@ -63,6 +96,9 @@ public: std::string server; std::map tags; +private: + static std::string THE_EMPTY_STRING; + }; inline std::ostream &operator <<(std::ostream &out, const IRCMessage &msg) { diff --git a/src/twitch/LoginToken.cpp b/src/twitch/LoginToken.cpp index 6feb6fd..91cd71f 100644 --- a/src/twitch/LoginToken.cpp +++ b/src/twitch/LoginToken.cpp @@ -20,11 +20,11 @@ LoginToken::PromiseType &LoginToken::Refresh(ws::Context &ws) { req.AddFormUrlenc("refresh_token", refresh_token); req.SetContentLength(); req.GetPromise() - .Then([this](ws::HttpsConnection &rsp) -> void { - HandleRefreshComplete(rsp); + .Then([this](ws::HttpsConnection *rsp) -> void { + HandleRefreshComplete(*rsp); }) - .Catch([this](ws::HttpsConnection &rsp) -> void { - HandleRefreshError(rsp); + .Catch([this](ws::HttpsConnection *rsp) -> void { + HandleRefreshError(*rsp); }); return promise; } @@ -46,14 +46,14 @@ void LoginToken::HandleRefreshComplete(ws::HttpsConnection &rsp) { std::time(&now); expires = now + expires_in; Save(); - promise.Resolve(*this); + promise.Resolve(this); } void LoginToken::HandleRefreshError(ws::HttpsConnection &rsp) { is_refreshing = false; std::cout << "errored https request with status " << rsp.GetStatus() << std::endl; std::cout << "body: " << rsp.GetBody() << std::endl; - promise.Reject(*this); + promise.Reject(&rsp); } } diff --git a/src/twitch/LoginToken.h b/src/twitch/LoginToken.h index 1db38a1..ce1898a 100644 --- a/src/twitch/LoginToken.h +++ b/src/twitch/LoginToken.h @@ -19,7 +19,7 @@ namespace twitch { class LoginToken { public: - typedef sys::Promise PromiseType; + typedef sys::Promise PromiseType; public: LoginToken(): expires(0), is_refreshing(false) { @@ -51,7 +51,7 @@ public: out << json << std::endl; } - bool HasExpired() { + bool HasExpired() const { time_t now; std::time(&now); return expires < now; diff --git a/src/ws/Connection.cpp b/src/ws/Connection.cpp index 07ec0a2..12dced7 100644 --- a/src/ws/Connection.cpp +++ b/src/ws/Connection.cpp @@ -5,6 +5,7 @@ #include #include +#include #include #include @@ -12,17 +13,17 @@ namespace ws { -HttpsConnection::HttpsConnection(Context &ctx, const char *method, const char *host, const char *path) -: info{0}, wsi(nullptr), read_buffer{0}, status(0) { +HttpsConnection::HttpsConnection(Context &ctx, const std::string &m, const std::string &h, const std::string &p) +: method(m), host(h), path(p), info{0}, wsi(nullptr), read_buffer{0}, status(0) { info.context = ctx.GetContext(); info.opaque_user_data = this; - info.address = host; + info.address = host.c_str(); info.port = 443; info.ssl_connection = 1; - info.path = path; - info.host = host; + info.path = path.c_str(); + info.host = host.c_str(); info.origin = "test"; - info.method = method; + info.method = method.c_str(); info.protocol = "https"; info.ietf_version_or_minus_one = -1; info.userdata = &ctx; @@ -37,7 +38,7 @@ HttpsConnection::HttpsConnection(Context &ctx, const char *method, const char *h int HttpsConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_t len) { switch (reason) { case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: - promise.Reject(*this); + promise.Reject(this); break; case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: status = lws_http_client_http_response(wsi); @@ -54,7 +55,7 @@ int HttpsConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_t } break; case LWS_CALLBACK_COMPLETED_CLIENT_HTTP: - promise.Resolve(*this); + promise.Resolve(this); break; case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: if (!lws_http_is_redirected_to_get(wsi)) { @@ -89,13 +90,6 @@ int HttpsConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_t } } break; - case LWS_CALLBACK_WSI_CREATE: - case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: - case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: - case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: - case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: - case LWS_CALLBACK_CLOSED_CLIENT_HTTP: - break; default: std::cout << "unhandled https connection proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl; if (in && len) { @@ -170,19 +164,6 @@ int PusherConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ Ping(); lws_set_timer_usecs(wsi, 30000000); break; - case LWS_CALLBACK_CLIENT_RECEIVE_PONG: - case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: - case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: - case LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL: - case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL: - case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: - case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: - case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: - case LWS_CALLBACK_CLOSED_CLIENT_HTTP: - case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: - case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: - case LWS_CALLBACK_WSI_CREATE: - break; default: std::cout << "unhandled pusher connection proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl; if (in && len) { @@ -194,7 +175,7 @@ int PusherConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ } TwitchConnection::TwitchConnection(Context &ctx) -: ctx(ctx), info{0}, wsi(nullptr), connected(false), authenticated(false) { +: ctx(ctx), info{0}, wsi(nullptr), connected(false), authenticated(false), closed(false) { info.context = ctx.GetContext(); info.opaque_user_data = this; // wss://irc-ws.chat.twitch.tv:443 @@ -226,7 +207,9 @@ int TwitchConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ connected = false; authenticated = false; std::cout << "twitch connection closed" << std::endl; - Connect(); + if (!closed) { + Connect(); + } break; case LWS_CALLBACK_CLIENT_RECEIVE: if (lws_is_first_fragment(wsi)) { @@ -256,19 +239,6 @@ int TwitchConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ Ping(); lws_set_timer_usecs(wsi, 60000000); break; - case LWS_CALLBACK_CLIENT_RECEIVE_PONG: - case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: - case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: - case LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL: - case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL: - case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: - case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: - case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: - case LWS_CALLBACK_CLOSED_CLIENT_HTTP: - case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: - case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: - case LWS_CALLBACK_WSI_CREATE: - break; default: std::cout << "unhandled twitch connection proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl; if (in && len) { @@ -279,11 +249,70 @@ int TwitchConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_ return 0; } -HttpsConnection &TwitchConnection::AuthorizedRequest(const char *method, const char *host, const char *path) { - HttpsConnection &req = ctx.HttpsRequest(method, host, path); - req.SetHeader("Authorization", "Bearer " + token.GetAccessToken()); - req.SetHeader("Client-Id", token.GetClientId()); - return req; +TwitchConnection::RequestPromise TwitchConnection::AuthorizedRequest(const std::string &method, const std::string &host, const std::string &path) { + RequestPromise promise; + if (TokenValid()) { + HttpsConnection &req = ctx.HttpsRequest(method.c_str(), host.c_str(), path.c_str()); + req.SetHeader("Authorization", "Bearer " + token.GetAccessToken()); + req.SetHeader("Client-Id", token.GetClientId()); + promise.Resolve(&req); + } else { + token.Refresh(ctx) + .Then([=](twitch::LoginToken *token) mutable -> void { + HttpsConnection &req = ctx.HttpsRequest(method.c_str(), host.c_str(), path.c_str()); + req.SetHeader("Authorization", "Bearer " + token->GetAccessToken()); + req.SetHeader("Client-Id", token->GetClientId()); + promise.Resolve(&req); + }).Catch([=](HttpsConnection *rsp) mutable -> void { + promise.Reject(rsp); + }); + } + return promise; +} + +TwitchConnection::WebPromise TwitchConnection::FetchClips(const std::string &from) { + WebPromise promise; + std::string path = "/helix/clips?first=100&broadcaster_id=" + from; + AuthorizedRequest("GET", "api.twitch.tv", path).Then([=](HttpsConnection *req) -> void { + req->SetContentLength(); + req->GetPromise() + .Then([=](HttpsConnection *rsp) mutable -> void { + if (rsp->IsPositive()) { + Json::Value json = rsp->GetBodyJSON(); + promise.Resolve(&json); + } else { + promise.Reject(rsp); + } + }) + .Catch([=](HttpsConnection *rsp) mutable -> void { + promise.Reject(rsp); + }); + }); + return promise; +} + +TwitchConnection::WebPromise TwitchConnection::Shoutout(const std::string &from, const std::string &to) { + WebPromise promise; + AuthorizedRequest("POST", "api.twitch.tv", "/helix/chat/shoutouts").Then([=](HttpsConnection *req) -> void { + req->SetHeader("Content-Type", "application/x-www-form-urlencoded"); + req->AddFormUrlenc("from_broadcaster_id", from); + req->AddFormUrlenc("to_broadcaster_id", to); + req->AddFormUrlenc("moderator_id", from); + req->SetContentLength(); + req->GetPromise() + .Then([this, promise](HttpsConnection *rsp) mutable -> void { + if (rsp->IsPositive()) { + Json::Value json = rsp->GetBodyJSON(); + promise.Resolve(&json); + } else { + promise.Reject(rsp); + } + }) + .Catch([this, promise](HttpsConnection *rsp) mutable -> void { + promise.Reject(rsp); + }); + }); + return promise; } } diff --git a/src/ws/Context.h b/src/ws/Context.h index c9abe2c..2347e9d 100644 --- a/src/ws/Context.h +++ b/src/ws/Context.h @@ -77,28 +77,32 @@ public: private: static int https_callback(lws *wsi, lws_callback_reasons reason, void *user, void *in, size_t len) { + Context *ctx = static_cast(user); void *user_data = lws_get_opaque_user_data(wsi); - Context *c = static_cast(user); - if (user_data) { - HttpsConnection *conn = static_cast(user_data); - if (reason == LWS_CALLBACK_WSI_DESTROY) { - c->RemoveHttpConnection(conn); - return 0; - } - return conn->ProtoCallback(reason, in, len); - } - if (c) { - return c->HttpsCallback(reason, in, len); - } - return 0; - } - - int HttpsCallback(lws_callback_reasons reason, void *in, size_t len) { + HttpsConnection *conn = static_cast(user_data); switch (reason) { - case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: + case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: + case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: + case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ: + case LWS_CALLBACK_RECEIVE_CLIENT_HTTP: + case LWS_CALLBACK_COMPLETED_CLIENT_HTTP: + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + case LWS_CALLBACK_CLIENT_HTTP_WRITEABLE: + return conn->ProtoCallback(reason, in, len); + case LWS_CALLBACK_CLOSED_CLIENT_HTTP: + ctx->RemoveHttpConnection(conn); + break; case LWS_CALLBACK_PROTOCOL_INIT: case LWS_CALLBACK_PROTOCOL_DESTROY: + case LWS_CALLBACK_WSI_CREATE: + case LWS_CALLBACK_WSI_DESTROY: + case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: break; default: std::cout << "unhandled generic https proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl; @@ -121,24 +125,32 @@ private: } static int pusher_callback(lws *wsi, lws_callback_reasons reason, void *user, void *in, size_t len) { + Context *ctx = static_cast(user); void *user_data = lws_get_opaque_user_data(wsi); - if (user_data) { - PusherConnection *conn = static_cast(user_data); - return conn->ProtoCallback(reason, in, len); - } - if (user) { - Context *c = static_cast(user); - return c->PusherCallback(reason, in, len); - } - return 0; - } - - int PusherCallback(lws_callback_reasons reason, void *in, size_t len) { + PusherConnection *conn = static_cast(user_data); switch (reason) { + case LWS_CALLBACK_CLIENT_ESTABLISHED: + case LWS_CALLBACK_CLIENT_CLOSED: + case LWS_CALLBACK_CLIENT_RECEIVE: + case LWS_CALLBACK_CLIENT_WRITEABLE: + case LWS_CALLBACK_TIMER: + return conn->ProtoCallback(reason, in, len); + case LWS_CALLBACK_CLIENT_RECEIVE_PONG: case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: + case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: + case LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL: + case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL: + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: + case LWS_CALLBACK_CLOSED_CLIENT_HTTP: + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + case LWS_CALLBACK_WSI_CREATE: case LWS_CALLBACK_PROTOCOL_INIT: case LWS_CALLBACK_PROTOCOL_DESTROY: case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: break; default: std::cout << "unhandled generic pusher proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl; @@ -151,24 +163,32 @@ private: } static int twitch_callback(lws *wsi, lws_callback_reasons reason, void *user, void *in, size_t len) { + Context *ctx = static_cast(user); void *user_data = lws_get_opaque_user_data(wsi); - if (user_data) { - TwitchConnection *conn = static_cast(user_data); - return conn->ProtoCallback(reason, in, len); - } - if (user) { - Context *c = static_cast(user); - return c->TwitchCallback(reason, in, len); - } - return 0; - } - - int TwitchCallback(lws_callback_reasons reason, void *in, size_t len) { + TwitchConnection *conn = static_cast(user_data); switch (reason) { + case LWS_CALLBACK_CLIENT_ESTABLISHED: + case LWS_CALLBACK_CLIENT_CLOSED: + case LWS_CALLBACK_CLIENT_RECEIVE: + case LWS_CALLBACK_CLIENT_WRITEABLE: + case LWS_CALLBACK_TIMER: + return conn->ProtoCallback(reason, in, len); + case LWS_CALLBACK_CLIENT_RECEIVE_PONG: case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL: + case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL: + case LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL: + case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL: + case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION: + case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: + case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP: + case LWS_CALLBACK_CLOSED_CLIENT_HTTP: + case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED: + case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH: + case LWS_CALLBACK_WSI_CREATE: case LWS_CALLBACK_PROTOCOL_INIT: case LWS_CALLBACK_PROTOCOL_DESTROY: case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS: + case LWS_CALLBACK_EVENT_WAIT_CANCELLED: break; default: std::cout << "unhandled generic twitch proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl; diff --git a/src/ws/HttpsConnection.h b/src/ws/HttpsConnection.h index c286d2d..2e5f617 100644 --- a/src/ws/HttpsConnection.h +++ b/src/ws/HttpsConnection.h @@ -2,6 +2,7 @@ #define TEST_WS_HTTPSCONNECTION_H_ #include +#include #include #include #include @@ -15,10 +16,10 @@ class Context; class HttpsConnection { public: - typedef sys::Promise PromiseType; + typedef sys::Promise PromiseType; public: - HttpsConnection(Context &ctx, const char *method, const char *host, const char *path); + HttpsConnection(Context &ctx, const std::string &method, const std::string &host, const std::string &path); ~HttpsConnection() { } @@ -75,13 +76,28 @@ public: return status; } + bool IsPositive() const { + return status >= 200 && status < 400; + } + const std::string &GetBody() const { return in_buffer; } + Json::Value GetBodyJSON() const { + Json::Value json; + Json::Reader json_reader; + json_reader.parse(in_buffer, json); + return json; + } + +public: int ProtoCallback(lws_callback_reasons reason, void *in, size_t len); private: + std::string method; + std::string host; + std::string path; lws_client_connect_info info; lws *wsi; diff --git a/src/ws/PusherConnection.h b/src/ws/PusherConnection.h index b89a28f..52e578a 100644 --- a/src/ws/PusherConnection.h +++ b/src/ws/PusherConnection.h @@ -8,7 +8,7 @@ #include #include -#include "../sys/Promise.h" +#include "../sys/Callbacks.h" namespace ws { @@ -17,7 +17,7 @@ class Context; class PusherConnection { public: - typedef sys::Promise PromiseType; + typedef sys::Callbacks CallbacksType; public: explicit PusherConnection(Context &ctx); @@ -32,7 +32,7 @@ public: SendMessage("{\"event\":\"pusher:ping\"}"); } - PromiseType &Subscribe(const std::string &chan) { + CallbacksType &Subscribe(const std::string &chan) { auto it = callbacks.find(chan); if (it != callbacks.end()) { return it->second; @@ -72,7 +72,7 @@ public: Json::Value json; json_reader.parse(msg, json); const std::string channel = json["channel"].asString(); - callbacks[channel].Resolve(json); + callbacks[channel].FireNothrow(json); } private: @@ -85,7 +85,7 @@ private: Json::Reader json_reader; Json::FastWriter json_writer; - std::map callbacks; + std::map callbacks; }; diff --git a/src/ws/TwitchConnection.h b/src/ws/TwitchConnection.h index b37a0c9..3b430b9 100644 --- a/src/ws/TwitchConnection.h +++ b/src/ws/TwitchConnection.h @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -10,10 +11,11 @@ #include +#include "HttpsConnection.h" #include "../twitch/IRCMessage.h" #include "../twitch/LoginToken.h" +#include "../sys/Callbacks.h" #include "../sys/Promise.h" -#include "HttpsConnection.h" namespace ws { @@ -22,7 +24,9 @@ class Context; class TwitchConnection { public: - typedef sys::Promise PromiseType; + typedef sys::Callbacks IRCCallbacks; + typedef sys::Promise WebPromise; + typedef sys::Promise RequestPromise; public: explicit TwitchConnection(Context &ctx); @@ -37,10 +41,10 @@ public: SendMessage("CAP REQ :twitch.tv/tags twitch.tv/commands"); if (token.HasExpired()) { token.Refresh(ctx) - .Then([this](twitch::LoginToken &) -> void { + .Then([this](twitch::LoginToken *) -> void { Login(); }) - .Catch([this](twitch::LoginToken &) -> void { + .Catch([this](HttpsConnection *) -> void { std::cerr << "unable to refresh login token" << std::endl; }); } else { @@ -48,6 +52,14 @@ public: } } + bool TokenValid() const { + return !token.HasExpired(); + } + + twitch::LoginToken::PromiseType &RefreshToken() { + return token.Refresh(ctx); + } + void Ping() { SendMessage("PING localhorst.tv"); } @@ -57,7 +69,7 @@ public: SendMessage("NICK HorstieBot"); } - PromiseType &Join(const std::string &chan) { + IRCCallbacks &Join(const std::string &chan) { auto it = callbacks.find(chan); if (it != callbacks.end()) { return it->second; @@ -87,7 +99,13 @@ public: lws_callback_on_writable(wsi); } - HttpsConnection &AuthorizedRequest(const char *method, const char *host, const char *path); + void WithValidToken(std::function cb, std::function err); + + RequestPromise AuthorizedRequest(const std::string &method, const std::string &host, const std::string &path); + + WebPromise FetchClips(const std::string &from); + + WebPromise Shoutout(const std::string &from, const std::string &to); public: int ProtoCallback(lws_callback_reasons reason, void *in, size_t len); @@ -135,10 +153,14 @@ public: if (msg.params.empty()) return; auto it = callbacks.find(msg.params[0]); if (it != callbacks.end()) { - it->second.Resolve(msg); + it->second.FireNothrow(msg); } } + void SetClosed() { + closed = true; + } + private: void Connect() { wsi = lws_client_connect_via_info(&info); @@ -157,11 +179,12 @@ private: lws *wsi; bool connected; bool authenticated; + bool closed; std::string in_buffer; std::string out_buffer; - std::map callbacks; + std::map callbacks; twitch::LoginToken token; twitch::IRCMessage in_msg; -- 2.39.2