CPP_SRCS = $(shell find src -name \*.cpp)
CPP_DEPS = $(shell find src -name \*.h)
-LIBS = cairo freetype2 jsoncpp libavformat libavcodec libavutil libswresample libswscale libuv libwebsockets pangocairo
+LIBS = cairo freetype2 icu-uc jsoncpp libavformat libavcodec libavutil libswresample libswscale libuv libwebsockets pangocairo
main: $(CPP_SRCS) $(CPP_DEPS)
- clang++ -g $(shell pkg-config --cflags --libs $(LIBS)) $(CPP_SRCS) -o $@
+ clang++ -gdwarf-4 $(shell pkg-config --cflags --libs $(LIBS)) $(CPP_SRCS) -o $@
compile_flags.txt:
echo -xc++ > $@
, renderer(stream.GetVideoPlane(), stream.GetVideoLineSize(), width, height)
, state(width, height)
, own_channel_id("1020523186")
+ , enable_realtime(false)
+ , enable_shoutouts(false)
, drawing_game(renderer.GetContext(), 45, 50, { 1, 1, 1 }) {
state.SetGame(&drawing_game);
}
Application &operator =(const Application &) = delete;
public:
+ void EnableRealtime() {
+ enable_realtime = true;
+ }
+
+ void EnableShoutouts() {
+ enable_shoutouts = true;
+ }
+
void Start() {
std::cout << "starting services" << std::endl;
- pusher_conn.Subscribe("Channel").Then([this](const Json::Value &json) -> void {
+ pusher_conn.Subscribe("Channel").Listen([this](const Json::Value &json) -> void {
HandlePusherChannel(json);
});
- pusher_conn.Subscribe("ChatBotLog").Then([this](const Json::Value &json) -> void {
+ pusher_conn.Subscribe("ChatBotLog").Listen([this](const Json::Value &json) -> void {
HandlePusherChatBotLog(json);
});
- twitch_conn.Join("#horstiebot").Then([this](const twitch::IRCMessage &msg) -> void {
+ twitch_conn.Join("#horstiebot").Listen([this](const twitch::IRCMessage &msg) -> void {
HandleTwitch(msg);
});
- ws_ctx.HttpsRequest("GET", "alttp.localhorst.tv", "/api/channels?chatting=1").GetPromise().Then([this](ws::HttpsConnection &rsp) -> void {
- InitChannels(rsp);
+ ws_ctx.HttpsRequest("GET", "alttp.localhorst.tv", "/api/channels?chatting=1").GetPromise().Then([this](ws::HttpsConnection *rsp) -> void {
+ InitChannels(*rsp);
});
stream.Start();
- //Media &media = state.AddMedia("test.mp4");
- //Clock sync_point = stream.GetVideoClock();
- //sync_point.Advance(600);
- //media.SetSyncPoint(sync_point);
- //media.AddWindow({ 0, 0, 1920, 1080 }, { 600, 50, 640, 360 });
-
- //Media &media = state.AddMedia("test.mkv");
- //Clock sync_point = stream.GetVideoClock();
- //sync_point.Advance(600);
- //media.SetSyncPoint(sync_point);
- //media.AddWindow({ 0, 0, 1280, 720 }, { 600, 50, 640, 360 });
+ //Media &media_a = state.AddMedia("test.mp4");
+ //Clock sync_point_a = stream.GetVideoClock();
+ //sync_point_a.Advance(1200);
+ //media_a.SetSyncPoint(sync_point_a);
+ //media_a.AddWindow({ 0, 0, 1, 1 }, { 100, 250, 640, 360 });
+
+ //Media &media_b = state.AddMedia("test.mkv");
+ //Clock sync_point_b = stream.GetVideoClock();
+ //sync_point_b.Advance(600);
+ //media_b.SetSyncPoint(sync_point_b);
+ //media_b.AddWindow({ 0, 0, 1, 1 }, { 600, 50, 640, 360 });
}
void Step() {
stream.PrepareVideoFrame();
state.PullVideo(stream.GetVideoClock());
- if (target > 0 && difference < 0) {
+ if (enable_realtime && target > 0 && difference < 0) {
std::cout << (difference / -1000.0) << "s behind schedule, dropping frame" << std::endl;
} else {
state.Update(renderer.GetContext(), stream.GetVideoClock());
state.Clean();
- if (difference > 1000) {
+ if (enable_realtime && difference > 1000) {
std::this_thread::sleep_for(std::chrono::milliseconds(difference - 1000));
}
}
void Stop() {
+ twitch_conn.SetClosed();
ws_ctx.Shutdown();
stream.Finish();
}
ChannelInfo &info = state.GetChannelInfo(channel_id);
info.Update(channel);
}
+ ShoutoutChannel(33);
}
void HandlePusherChannel(const Json::Value &json) {
void UpdateChannel(const Json::Value &json) {
int channel_id = json["id"].asInt();
- bool is_live =json["twitch_live"].asBool();
+ bool is_live = json["twitch_live"].asBool();
bool is_known = state.IsChannelKnown(channel_id);
ChannelInfo &channel = state.GetChannelInfo(channel_id);
bool went_live = !channel.twitch_live && is_live;
if (went_live) {
// channel went live
std::cout << "channel " << channel.title << " went live" << std::endl;
- ShoutoutChannel(channel_id);
+ if (channel.chat) {
+ ShoutoutChannel(channel_id);
+ }
} else if (went_down) {
// channel went down
std::cout << "channel " << channel.title << " went down" << std::endl;
ChannelInfo &channel = state.GetChannelInfo(id);
Shoutout &shout = renderer.CreateShoutout(id, state);
if (!channel.twitch_id.empty() && channel.twitch_id != own_channel_id) {
- ws::HttpsConnection &req = twitch_conn.AuthorizedRequest("POST", "api.twitch.tv", "/helix/chat/shoutouts");
- req.SetHeader("Content-Type", "application/x-www-form-urlencoded");
- req.AddFormUrlenc("from_broadcaster_id", own_channel_id);
- req.AddFormUrlenc("to_broadcaster_id", channel.twitch_id);
- req.AddFormUrlenc("moderator_id", own_channel_id);
- req.SetContentLength();
+ if (enable_shoutouts) {
+ twitch_conn.Shoutout(own_channel_id, channel.twitch_id);
+ }
+ shout.FetchClip(twitch_conn);
}
}
}
void HandleTwitch(const twitch::IRCMessage &msg) {
+ if (msg.StartsWith("!so ") && msg.IsMod()) {
+ std::string name = msg.GetText().length() > 4 && msg.GetText()[4] == '@'
+ ? msg.GetText().substr(5) : msg.GetText().substr(4);
+ const ChannelInfo *channel = state.FindChannelInfo(name);
+ if (channel) {
+ ShoutoutChannel(channel->id);
+ }
+ }
if (state.HasGame()) {
state.GetGame().Handle(msg);
}
Renderer renderer;
State state;
std::string own_channel_id;
+ bool enable_realtime;
+ bool enable_shoutouts;
DrawingGame drawing_game;
encoder.SetSampleRate(48000);
encoder.SetSampleFormat(AV_SAMPLE_FMT_FLT);
encoder.Open();
- resampler.SetOpt("in_channel_count", decoder.GetChannelLayout().nb_channels);
+ resampler.SetOpt("in_channel_layout", decoder.GetChannelLayout());
resampler.SetOpt("in_sample_rate", decoder.GetSampleRate());
resampler.SetOpt("in_sample_fmt", decoder.GetSampleFormat());
- resampler.SetOpt("out_channel_count", encoder.GetChannelLayout().nb_channels);
+ resampler.SetOpt("out_channel_layout", encoder.GetChannelLayout());
resampler.SetOpt("out_sample_rate", encoder.GetSampleRate());
resampler.SetOpt("out_sample_fmt", encoder.GetSampleFormat());
resampler.Init();
- if (encoder.GetFrameSize() > 0) {
- output_frame.AllocateAudio(encoder.GetFrameSize(), encoder.GetSampleFormat(), encoder.GetChannelLayout());
- } else {
- output_frame.AllocateAudio(decoder.GetFrameSize(), encoder.GetSampleFormat(), encoder.GetChannelLayout());
- }
+ output_frame.AllocateAudio(resampler.GetOutSamples(decoder.GetFrameSize()), encoder.GetSampleFormat(), encoder.GetChannelLayout());
clock = Clock(encoder.GetTimeBase());
}
~AudioReceiver() {
bool res = decoder.ReceiveFrame(input_frame);
if (res) {
ready = true;
- int converted = resampler.Convert(encoder, input_frame, output_frame);
+ int converted = resampler.Convert(decoder, input_frame, encoder, output_frame);
Buffer(converted);
clock.Advance(converted);
}
: id(json["id"].asInt())
, title(json["title"].asString())
, twitch_id(json["twitch_id"].asString())
+ , twitch_chat(json["twitch_chat"].asString())
, twitch_title(json["twitch_title"].asString())
, twitch_category(json["twitch_category_name"].asString())
, chat(json["chat"].asBool())
}
title = json["title"].asString();
twitch_id = json["twitch_id"].asString();
+ twitch_chat = json["twitch_chat"].asString();
twitch_title = json["twitch_title"].asString();
twitch_category = json["twitch_category_name"].asString();
chat = json["chat"].asBool();
int id;
std::string title;
std::string twitch_id;
+ std::string twitch_chat;
std::string twitch_title;
std::string twitch_category;
bool chat;
}
void AddWindow(const gfx::Rectangle &src, const gfx::Rectangle &dst) {
- windows.push_back({ src, dst });
+ windows.push_back({ src.ScaleTo(source.GetVideoSize()), dst });
}
void PullAudio(const Clock &clock, int frame_size) {
--- /dev/null
+#include "Shoutout.h"
+
+#include <random>
+
+#include "State.h"
+
+namespace app {
+
+void Shoutout::LoadRandomClip(const Json::Value &json) {
+ const Json::Value &data = json["data"];
+ if (!data.isArray()) return;
+ if (data.empty()) return;
+ Json::ArrayIndex num = data.size();
+ std::uniform_int_distribution<std::mt19937::result_type> dist(0, num - 1);
+ Json::ArrayIndex choice = dist(state.GetRNG());
+ const Json::Value &clip_json = data[choice];
+ clip = twitch::Clip(clip_json);
+}
+
+void Shoutout::Start(const Clock &clock) {
+ start_time = clock;
+ running = true;
+ if (clip.HasVideo()) {
+ std::cout << "adding clip " << clip.GetVideoURL() << " at " << clock << std::endl;
+ Media &media = state.AddMedia(clip.GetVideoURL().c_str());
+ media.SetSyncPoint(clock);
+ media.AddWindow({ 0, 0, 1, 1 }, { 320, 150, 640, 360 });
+ }
+}
+
+}
#include "../gfx/ColorRGB.h"
#include "../gfx/Position.h"
#include "../gfx/Spacing.h"
+#include "../twitch/Clip.h"
+#include "../ws/TwitchConnection.h"
+#include "json/forwards.h"
+#include "json/value.h"
#include <cstdint>
namespace app {
+class State;
+
class Shoutout {
public:
- Shoutout(const ChannelInfo &channel, cairo::Context &ctx)
+ Shoutout(const ChannelInfo &channel, cairo::Context &ctx, State &state)
: channel(channel)
+ , state(state)
, title_layout(ctx.CreateLayout())
, channel_layout(ctx.CreateLayout())
, category_layout(ctx.CreateLayout())
, category_color{ 0.6, 0.6, 0.6 }
, start_time()
, running(false)
- , done(false) {
+ , done(false)
+ , fetching_clip(false) {
title_layout.SetText(channel.twitch_title);
channel_layout.SetText(channel.title);
category_layout.SetText(channel.twitch_category);
public:
bool Loading() const {
- return false;
+ return fetching_clip;
}
bool Running() const {
category_layout.SetFont(font);
}
- void Start(const Clock &clock) {
- start_time = clock;
- running = true;
+ void FetchClip(ws::TwitchConnection &twitch) {
+ fetching_clip = true;
+ twitch.FetchClips(channel.twitch_id)
+ .Then([this](const Json::Value *rsp) -> void {
+ fetching_clip = false;
+ LoadRandomClip(*rsp);
+ })
+ .Catch([this](ws::HttpsConnection *rsp) -> void {
+ fetching_clip = false;
+ std::cout << "failed to fetch clips" << std::endl;
+ });
}
+ void LoadRandomClip(const Json::Value &json);
+
+ void Start(const Clock &clock);
+
void Update(cairo::Context &ctx, const Clock &clock) {
const Clock runtime = clock.Difference(start_time);
int64_t ms = runtime.GetMS();
private:
const ChannelInfo &channel;
+ State &state;
pango::Layout title_layout;
pango::Layout channel_layout;
gfx::Size size;
gfx::Spacing padding;
+ twitch::Clip clip;
+
Clock start_time;
bool running;
bool done;
+ bool fetching_clip;
};
return audio.IsEOF() && video.IsEOF();
}
+ gfx::Size GetVideoSize() const {
+ return video.GetSize();
+ }
+
cairo::Surface GetVideoSurface() {
return video.GetSurface();
}
#include <list>
#include <map>
#include <ostream>
+#include <random>
+#include <unicode/unistr.h>
#include "ChannelInfo.h"
#include "Clock.h"
public:
State(int width, int height)
- : width(width), height(height), game(nullptr) {
+ : width(width), height(height), game(nullptr), rnd_gen(rnd_dev()) {
}
public:
return info;
}
+ ChannelInfo *FindChannelInfo(const std::string &str) {
+ for (auto &c : channels) {
+ if (strcasecmp(c.second.title.c_str(), str.c_str()) == 0) {
+ return &c.second;
+ }
+ if (c.second.twitch_chat.length() > 1 && strcasecmp(c.second.twitch_chat.c_str() + 1, str.c_str()) == 0) {
+ return &c.second;
+ }
+ }
+ return nullptr;
+ }
+
const std::list<Media> &GetMedia() const {
return media;
}
}
Shoutout &AddShoutout(int channel_id, cairo::Context &ctx) {
- shoutouts.emplace_back(GetChannelInfo(channel_id), ctx);
+ shoutouts.emplace_back(GetChannelInfo(channel_id), ctx, *this);
return shoutouts.back();
}
return height;
}
+ std::mt19937 &GetRNG() {
+ return rnd_gen;
+ }
+
void PullAudio(const Clock &clock, int frame_size) {
for (Media &m : media) {
m.PullAudio(clock, frame_size);
std::list<Message> msgs;
std::list<Shoutout> shoutouts;
+ std::random_device rnd_dev;
+ std::mt19937 rnd_gen;
+
};
}
}
void PushAudioFrame() {
- resampler.Convert(audio_encoder, audio_input_frame, audio_output_frame);
+ resampler.Convert(audio_encoder, audio_input_frame, audio_encoder, audio_output_frame);
audio_output_frame.SetPresentationTimestamp(audio_clock.GetCounter());
audio_encoder.SendFrame(audio_output_frame);
while (audio_encoder.ReceivePacket(audio_packet)) {
return ready;
}
+ gfx::Size GetSize() const {
+ return gfx::Size{ double(decoder.GetWidth()), double(decoder.GetHeight()) };
+ }
+
cairo::Surface GetSurface() {
return cairo::Surface(
output_frame.GetDataPlane(0), output_frame.GetPlaneLinesize(0), CAIRO_FORMAT_ARGB32,
#define TEST_FFMPEG_DECODER_H_
#include <cerrno>
+#include <libavutil/frame.h>
extern "C" {
#include <libavcodec/avcodec.h>
}
}
+ const AVChannelLayout &GetChannelLayout() const {
+ return frame->ch_layout;
+ }
+
uint8_t **GetData() {
return frame->data;
}
return frame->nb_samples;
}
- void SetChannelLayout(AVChannelLayout layout) {
+ AVSampleFormat GetSampleFormat() const {
+ return static_cast<AVSampleFormat>(frame->format);
+ }
+
+ int GetSampleRate() const {
+ return frame->sample_rate;
+ }
+
+ void SetChannelLayout(const AVChannelLayout &layout) {
frame->ch_layout = layout;
}
#include "CodecContext.h"
#include <cstdint>
+#include <iostream>
+#include <openssl/conf.h>
#include <stdexcept>
extern "C" {
+#include <libavutil/channel_layout.h>
#include <libavutil/mathematics.h>
#include <libavutil/opt.h>
#include <libavutil/samplefmt.h>
Resampler &operator =(const Resampler &) = delete;
public:
- int Convert(const CodecContext &codec, const Frame &src, Frame &dst) {
- int64_t from = swr_get_delay(ctx, codec.GetSampleRate()) + src.GetSamples();
- int64_t nb_samples = av_rescale_rnd(from, codec.GetSampleRate(), codec.GetSampleRate(), AV_ROUND_UP);
- int res = swr_convert(ctx, dst.GetData(), nb_samples, src.GetData(), src.GetSamples());
+ int Convert(const CodecContext &src_codec, const Frame &src, const CodecContext &dst_codec, Frame &dst) {
+ //int64_t from = swr_get_delay(ctx, src_codec.GetSampleRate()) + src.GetSamples();
+ //int64_t nb_samples = av_rescale_rnd(from, dst_codec.GetSampleRate(), src_codec.GetSampleRate(), AV_ROUND_UP);
+ //dst.EnsureAudioAllocated(nb_samples);
+ int res = swr_convert(ctx, dst.GetData(), dst.GetSamples(), src.GetData(), src.GetSamples());
+ if (res > dst.GetSamples()) {
+ std::cout << "ERROR: wrote more samples than space available!" << std::endl;
+ }
if (res < 0) {
throw Error("failed to resample", res);
}
}
}
+ int GetOutSamples(int in_samples) const {
+ int out_samples = swr_get_out_samples(ctx, in_samples);
+ if (out_samples < 0) {
+ throw Error("unable to calculate output samples", out_samples);
+ }
+ return out_samples;
+ }
+
void SetOpt(const char *name, int value) {
int res = av_opt_set_int(ctx, name, value, 0);
if (res != 0) {
}
}
+ void SetOpt(const char *name, const AVChannelLayout &value) {
+ int res = av_opt_set_chlayout(ctx, name, &value, 0);
+ if (res != 0) {
+ throw Error("failed to set option", res);
+ }
+ }
+
private:
SwrContext *ctx;
#define TEST_GFX_RECTANGLE_H_
#include "Position.h"
+#include "Size.h"
namespace gfx {
return gfx::Position{x, y};
}
+ Rectangle ScaleTo(const Size &size) const {
+ return Rectangle{ x * size.w, y * size.h, w * size.w, h * size.h };
+ }
+
};
}
#include <csignal>
+#include <cstring>
#include <iostream>
#include "app/Application.h"
const char *URL = argc > 1 ? argv[1] : "rtmp://localhost/localhorsttv";
app::Application app(WIDTH, HEIGHT, FPS, URL);
+ app.EnableRealtime();
+ if (std::strcmp(URL, "rtmp://localhost/horstiebot") == 0) {
+ app.EnableShoutouts();
+ }
signal(SIGINT, stop);
--- /dev/null
+#ifndef TEST_SYS_CALLBACKS_H_
+#define TEST_SYS_CALLBACKS_H_
+
+#include <exception>
+#include <functional>
+#include <iostream>
+#include <vector>
+
+namespace sys {
+
+template<typename ...Args>
+class Callbacks {
+
+public:
+ typedef std::function<void(Args...)> Callback;
+
+public:
+ Callbacks<Args...> &Listen(Callback callback) {
+ callbacks.push_back(callback);
+ return *this;
+ }
+
+public:
+ void Fire(Args... args) {
+ for (Callback &callback : callbacks) {
+ callback(args...);
+ }
+ }
+
+ void FireNothrow(Args... args) {
+ for (Callback &callback : callbacks) {
+ try {
+ callback(args...);
+ } catch (const std::exception &e) {
+ std::cerr << "exception in nothrow callback: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "exception in nothrow callback" << std::endl;
+ }
+ }
+ }
+
+private:
+ std::vector<Callback> callbacks;
+
+};
+
+}
+
+#endif
#include <exception>
#include <functional>
#include <iostream>
+#include <memory>
+#include <type_traits>
#include <vector>
namespace sys {
-template<typename ...Args>
+template<typename ResultType, typename ErrorType = ResultType>
class Promise {
+ static_assert(!std::is_reference<ResultType>::value, "promise result type must not be reference");
+ static_assert(!std::is_reference<ErrorType>::value, "promise error type must not be reference");
+
public:
- typedef std::function<void(Args...)> Callback;
+ typedef std::function<void(ResultType)> ResultCallback;
+ typedef std::function<void(ErrorType)> ErrorCallback;
+ typedef Promise<ResultType, ErrorType> SelfType;
public:
- Promise<Args...> &Then(Callback callback) {
- success.push_back(callback);
+ Promise(): data(std::make_shared<Data>()) {
+ }
+
+public:
+ SelfType &Then(ResultCallback callback) {
+ if (data->status == PENDING) {
+ data->on_success.push_back(callback);
+ }
+ if (data->status == RESOLVED) {
+ RunResultCallback(callback);
+ }
return *this;
}
- Promise<Args...> &Catch(Callback callback) {
- error.push_back(callback);
+ SelfType &Catch(ErrorCallback callback) {
+ if (data->status == PENDING) {
+ data->on_error.push_back(callback);
+ }
+ if (data->status == REJECTED) {
+ RunErrorCallback(callback);
+ }
return *this;
}
public:
- void Resolve(Args... args) {
- for (Callback &callback : success) {
- try {
- callback(args...);
- } catch (const std::exception &e) {
- std::cerr << "exception in promise resolution: " << e.what() << std::endl;
- } catch (...) {
- std::cerr << "exception in promise resolution" << std::endl;
- }
+ void Resolve(ResultType result) {
+ if (data->status != PENDING) {
+ std::cout << "resolution of completed promise" << std::endl;
+ return;
+ }
+ data->result = result;
+ data->status = RESOLVED;
+ for (ResultCallback &callback : data->on_success) {
+ RunResultCallback(callback);
}
+ data->on_success.clear();
+ data->on_error.clear();
}
- void Reject(Args... args) {
- for (Callback &callback : error) {
- try {
- callback(args...);
- } catch (const std::exception &e) {
- std::cerr << "exception in promise rejection: " << e.what() << std::endl;
- } catch (...) {
- std::cerr << "exception in promise rejection" << std::endl;
- }
+ void Reject(ErrorType error) {
+ if (data->status != PENDING) {
+ std::cout << "rejection of completed promise" << std::endl;
+ return;
+ }
+ data->error = error;
+ data->status = REJECTED;
+ for (ErrorCallback &callback : data->on_error) {
+ RunErrorCallback(callback);
+ }
+ data->on_success.clear();
+ data->on_error.clear();
+ }
+
+private:
+ void RunResultCallback(ResultCallback &callback) {
+ try {
+ callback(data->result);
+ } catch (const std::exception &e) {
+ std::cerr << "exception in promise resolution: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "exception in promise resolution" << std::endl;
}
}
+ void RunErrorCallback(ErrorCallback &callback) {
+ try {
+ callback(data->error);
+ } catch (const std::exception &e) {
+ std::cerr << "exception in promise rejection: " << e.what() << std::endl;
+ } catch (...) {
+ std::cerr << "exception in promise rejection" << std::endl;
+ }
+ }
+private:
+ enum Status { PENDING, RESOLVED, REJECTED };
+ struct Data {
+ std::vector<ResultCallback> on_success;
+ std::vector<ErrorCallback> on_error;
+ ResultType result;
+ ErrorType error;
+ Status status = PENDING;
+ };
+
private:
- std::vector<Callback> success;
- std::vector<Callback> error;
+ std::shared_ptr<Data> data;
};
--- /dev/null
+#ifndef TEST_TWITCH_CLIP_H_
+#define TEST_TWITCH_CLIP_H_
+
+#include <iostream>
+#include <json/json.h>
+
+namespace twitch {
+
+class Clip {
+
+public:
+ Clip() {
+ }
+ explicit Clip(const Json::Value &json)
+ : broadcaster_name(json["broadcaster_name"].asString())
+ , creator_name(json["creator_name"].asString())
+ , thumbnail_url(json["thumbnail_url"].asString())
+ , title(json["title"].asString()) {
+ std::cout << "clip: " << json << std::endl;
+ size_t thumb_pos = thumbnail_url.find("-preview-");
+ if (thumb_pos != std::string::npos) {
+ video_url = thumbnail_url.substr(0, thumb_pos);
+ video_url += ".mp4";
+ }
+ }
+
+public:
+ bool HasVideo() const {
+ return !video_url.empty();
+ }
+
+ const std::string &GetVideoURL() const {
+ return video_url;
+ }
+
+private:
+ std::string broadcaster_name;
+ std::string creator_name;
+ std::string thumbnail_url;
+ std::string title;
+ std::string video_url;
+
+};
+
+}
+
+#endif
namespace twitch {
+std::string IRCMessage::THE_EMPTY_STRING;
+
void IRCMessage::Decode(std::string::const_iterator begin, std::string::const_iterator input_end) {
command.clear();
params.clear();
void Decode(std::string::const_iterator begin, std::string::const_iterator end);
void Encode(std::string &out) const;
- std::string GetText() const {
- return params.empty() ? "" : params.back();
+ const std::string &GetTag(const std::string &name) const {
+ auto it = tags.find(name);
+ return it != tags.end() ? it->second : THE_EMPTY_STRING;
+ }
+
+ const std::string &GetTarget() const {
+ return params.empty() ? THE_EMPTY_STRING : params.front();
+ }
+
+ const std::string &GetText() const {
+ return params.empty() ? THE_EMPTY_STRING : params.back();
+ }
+
+ bool HasTag(const std::string &name) const {
+ return tags.find(name) != tags.end();
}
bool IsLoginSuccess() const {
return command == "PRIVMSG";
}
+ bool IsOwner() const {
+ return GetTarget().substr(1) == nick;
+ }
+
+ bool IsMod() const {
+ return IsOwner() || GetTag("mod") == "1";
+ }
+
+ bool StartsWith(char c) const {
+ return !GetText().empty() && GetText()[0] == c;
+ }
+
+ bool StartsWith(const char *str) const {
+ return GetText().rfind(str, 0) == 0;
+ }
+
+ bool StartsWith(const std::string &str) const {
+ return GetText().rfind(str, 0) == 0;
+ }
+
IRCMessage MakePong() const {
IRCMessage pong;
pong.command = "PONG";
std::string server;
std::map<std::string, std::string> tags;
+private:
+ static std::string THE_EMPTY_STRING;
+
};
inline std::ostream &operator <<(std::ostream &out, const IRCMessage &msg) {
req.AddFormUrlenc("refresh_token", refresh_token);
req.SetContentLength();
req.GetPromise()
- .Then([this](ws::HttpsConnection &rsp) -> void {
- HandleRefreshComplete(rsp);
+ .Then([this](ws::HttpsConnection *rsp) -> void {
+ HandleRefreshComplete(*rsp);
})
- .Catch([this](ws::HttpsConnection &rsp) -> void {
- HandleRefreshError(rsp);
+ .Catch([this](ws::HttpsConnection *rsp) -> void {
+ HandleRefreshError(*rsp);
});
return promise;
}
std::time(&now);
expires = now + expires_in;
Save();
- promise.Resolve(*this);
+ promise.Resolve(this);
}
void LoginToken::HandleRefreshError(ws::HttpsConnection &rsp) {
is_refreshing = false;
std::cout << "errored https request with status " << rsp.GetStatus() << std::endl;
std::cout << "body: " << rsp.GetBody() << std::endl;
- promise.Reject(*this);
+ promise.Reject(&rsp);
}
}
class LoginToken {
public:
- typedef sys::Promise<LoginToken &> PromiseType;
+ typedef sys::Promise<LoginToken *, ws::HttpsConnection *> PromiseType;
public:
LoginToken(): expires(0), is_refreshing(false) {
out << json << std::endl;
}
- bool HasExpired() {
+ bool HasExpired() const {
time_t now;
std::time(&now);
return expires < now;
#include <cstdio>
#include <iostream>
+#include <json/json.h>
#include <libwebsockets.h>
#include <stdexcept>
namespace ws {
-HttpsConnection::HttpsConnection(Context &ctx, const char *method, const char *host, const char *path)
-: info{0}, wsi(nullptr), read_buffer{0}, status(0) {
+HttpsConnection::HttpsConnection(Context &ctx, const std::string &m, const std::string &h, const std::string &p)
+: method(m), host(h), path(p), info{0}, wsi(nullptr), read_buffer{0}, status(0) {
info.context = ctx.GetContext();
info.opaque_user_data = this;
- info.address = host;
+ info.address = host.c_str();
info.port = 443;
info.ssl_connection = 1;
- info.path = path;
- info.host = host;
+ info.path = path.c_str();
+ info.host = host.c_str();
info.origin = "test";
- info.method = method;
+ info.method = method.c_str();
info.protocol = "https";
info.ietf_version_or_minus_one = -1;
info.userdata = &ctx;
int HttpsConnection::ProtoCallback(lws_callback_reasons reason, void *in, size_t len) {
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
- promise.Reject(*this);
+ promise.Reject(this);
break;
case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
status = lws_http_client_http_response(wsi);
}
break;
case LWS_CALLBACK_COMPLETED_CLIENT_HTTP:
- promise.Resolve(*this);
+ promise.Resolve(this);
break;
case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
if (!lws_http_is_redirected_to_get(wsi)) {
}
}
break;
- case LWS_CALLBACK_WSI_CREATE:
- case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
- case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
- case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
- case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL:
- case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
- break;
default:
std::cout << "unhandled https connection proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl;
if (in && len) {
Ping();
lws_set_timer_usecs(wsi, 30000000);
break;
- case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
- case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL:
- case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL:
- case LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL:
- case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL:
- case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
- case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
- case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
- case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
- case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
- case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
- case LWS_CALLBACK_WSI_CREATE:
- break;
default:
std::cout << "unhandled pusher connection proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl;
if (in && len) {
}
TwitchConnection::TwitchConnection(Context &ctx)
-: ctx(ctx), info{0}, wsi(nullptr), connected(false), authenticated(false) {
+: ctx(ctx), info{0}, wsi(nullptr), connected(false), authenticated(false), closed(false) {
info.context = ctx.GetContext();
info.opaque_user_data = this;
// wss://irc-ws.chat.twitch.tv:443
connected = false;
authenticated = false;
std::cout << "twitch connection closed" << std::endl;
- Connect();
+ if (!closed) {
+ Connect();
+ }
break;
case LWS_CALLBACK_CLIENT_RECEIVE:
if (lws_is_first_fragment(wsi)) {
Ping();
lws_set_timer_usecs(wsi, 60000000);
break;
- case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
- case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL:
- case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL:
- case LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL:
- case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL:
- case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
- case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
- case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
- case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
- case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
- case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
- case LWS_CALLBACK_WSI_CREATE:
- break;
default:
std::cout << "unhandled twitch connection proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl;
if (in && len) {
return 0;
}
-HttpsConnection &TwitchConnection::AuthorizedRequest(const char *method, const char *host, const char *path) {
- HttpsConnection &req = ctx.HttpsRequest(method, host, path);
- req.SetHeader("Authorization", "Bearer " + token.GetAccessToken());
- req.SetHeader("Client-Id", token.GetClientId());
- return req;
+TwitchConnection::RequestPromise TwitchConnection::AuthorizedRequest(const std::string &method, const std::string &host, const std::string &path) {
+ RequestPromise promise;
+ if (TokenValid()) {
+ HttpsConnection &req = ctx.HttpsRequest(method.c_str(), host.c_str(), path.c_str());
+ req.SetHeader("Authorization", "Bearer " + token.GetAccessToken());
+ req.SetHeader("Client-Id", token.GetClientId());
+ promise.Resolve(&req);
+ } else {
+ token.Refresh(ctx)
+ .Then([=](twitch::LoginToken *token) mutable -> void {
+ HttpsConnection &req = ctx.HttpsRequest(method.c_str(), host.c_str(), path.c_str());
+ req.SetHeader("Authorization", "Bearer " + token->GetAccessToken());
+ req.SetHeader("Client-Id", token->GetClientId());
+ promise.Resolve(&req);
+ }).Catch([=](HttpsConnection *rsp) mutable -> void {
+ promise.Reject(rsp);
+ });
+ }
+ return promise;
+}
+
+TwitchConnection::WebPromise TwitchConnection::FetchClips(const std::string &from) {
+ WebPromise promise;
+ std::string path = "/helix/clips?first=100&broadcaster_id=" + from;
+ AuthorizedRequest("GET", "api.twitch.tv", path).Then([=](HttpsConnection *req) -> void {
+ req->SetContentLength();
+ req->GetPromise()
+ .Then([=](HttpsConnection *rsp) mutable -> void {
+ if (rsp->IsPositive()) {
+ Json::Value json = rsp->GetBodyJSON();
+ promise.Resolve(&json);
+ } else {
+ promise.Reject(rsp);
+ }
+ })
+ .Catch([=](HttpsConnection *rsp) mutable -> void {
+ promise.Reject(rsp);
+ });
+ });
+ return promise;
+}
+
+TwitchConnection::WebPromise TwitchConnection::Shoutout(const std::string &from, const std::string &to) {
+ WebPromise promise;
+ AuthorizedRequest("POST", "api.twitch.tv", "/helix/chat/shoutouts").Then([=](HttpsConnection *req) -> void {
+ req->SetHeader("Content-Type", "application/x-www-form-urlencoded");
+ req->AddFormUrlenc("from_broadcaster_id", from);
+ req->AddFormUrlenc("to_broadcaster_id", to);
+ req->AddFormUrlenc("moderator_id", from);
+ req->SetContentLength();
+ req->GetPromise()
+ .Then([this, promise](HttpsConnection *rsp) mutable -> void {
+ if (rsp->IsPositive()) {
+ Json::Value json = rsp->GetBodyJSON();
+ promise.Resolve(&json);
+ } else {
+ promise.Reject(rsp);
+ }
+ })
+ .Catch([this, promise](HttpsConnection *rsp) mutable -> void {
+ promise.Reject(rsp);
+ });
+ });
+ return promise;
}
}
private:
static int https_callback(lws *wsi, lws_callback_reasons reason, void *user, void *in, size_t len) {
+ Context *ctx = static_cast<Context *>(user);
void *user_data = lws_get_opaque_user_data(wsi);
- Context *c = static_cast<Context *>(user);
- if (user_data) {
- HttpsConnection *conn = static_cast<HttpsConnection *>(user_data);
- if (reason == LWS_CALLBACK_WSI_DESTROY) {
- c->RemoveHttpConnection(conn);
- return 0;
- }
- return conn->ProtoCallback(reason, in, len);
- }
- if (c) {
- return c->HttpsCallback(reason, in, len);
- }
- return 0;
- }
-
- int HttpsCallback(lws_callback_reasons reason, void *in, size_t len) {
+ HttpsConnection *conn = static_cast<HttpsConnection *>(user_data);
switch (reason) {
- case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL:
+ case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
+ case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
+ case LWS_CALLBACK_RECEIVE_CLIENT_HTTP_READ:
+ case LWS_CALLBACK_RECEIVE_CLIENT_HTTP:
+ case LWS_CALLBACK_COMPLETED_CLIENT_HTTP:
+ case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
+ case LWS_CALLBACK_CLIENT_HTTP_WRITEABLE:
+ return conn->ProtoCallback(reason, in, len);
+ case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
+ ctx->RemoveHttpConnection(conn);
+ break;
case LWS_CALLBACK_PROTOCOL_INIT:
case LWS_CALLBACK_PROTOCOL_DESTROY:
+ case LWS_CALLBACK_WSI_CREATE:
+ case LWS_CALLBACK_WSI_DESTROY:
+ case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL:
case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
+ case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
+ case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL:
+ case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
break;
default:
std::cout << "unhandled generic https proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl;
}
static int pusher_callback(lws *wsi, lws_callback_reasons reason, void *user, void *in, size_t len) {
+ Context *ctx = static_cast<Context *>(user);
void *user_data = lws_get_opaque_user_data(wsi);
- if (user_data) {
- PusherConnection *conn = static_cast<PusherConnection *>(user_data);
- return conn->ProtoCallback(reason, in, len);
- }
- if (user) {
- Context *c = static_cast<Context *>(user);
- return c->PusherCallback(reason, in, len);
- }
- return 0;
- }
-
- int PusherCallback(lws_callback_reasons reason, void *in, size_t len) {
+ PusherConnection *conn = static_cast<PusherConnection *>(user_data);
switch (reason) {
+ case LWS_CALLBACK_CLIENT_ESTABLISHED:
+ case LWS_CALLBACK_CLIENT_CLOSED:
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ case LWS_CALLBACK_TIMER:
+ return conn->ProtoCallback(reason, in, len);
+ case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL:
+ case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL:
+ case LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL:
+ case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL:
+ case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
+ case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
+ case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
+ case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
+ case LWS_CALLBACK_WSI_CREATE:
case LWS_CALLBACK_PROTOCOL_INIT:
case LWS_CALLBACK_PROTOCOL_DESTROY:
case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
+ case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
break;
default:
std::cout << "unhandled generic pusher proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl;
}
static int twitch_callback(lws *wsi, lws_callback_reasons reason, void *user, void *in, size_t len) {
+ Context *ctx = static_cast<Context *>(user);
void *user_data = lws_get_opaque_user_data(wsi);
- if (user_data) {
- TwitchConnection *conn = static_cast<TwitchConnection *>(user_data);
- return conn->ProtoCallback(reason, in, len);
- }
- if (user) {
- Context *c = static_cast<Context *>(user);
- return c->TwitchCallback(reason, in, len);
- }
- return 0;
- }
-
- int TwitchCallback(lws_callback_reasons reason, void *in, size_t len) {
+ TwitchConnection *conn = static_cast<TwitchConnection *>(user_data);
switch (reason) {
+ case LWS_CALLBACK_CLIENT_ESTABLISHED:
+ case LWS_CALLBACK_CLIENT_CLOSED:
+ case LWS_CALLBACK_CLIENT_RECEIVE:
+ case LWS_CALLBACK_CLIENT_WRITEABLE:
+ case LWS_CALLBACK_TIMER:
+ return conn->ProtoCallback(reason, in, len);
+ case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
case LWS_CALLBACK_CLIENT_HTTP_BIND_PROTOCOL:
+ case LWS_CALLBACK_CLIENT_HTTP_DROP_PROTOCOL:
+ case LWS_CALLBACK_WS_CLIENT_BIND_PROTOCOL:
+ case LWS_CALLBACK_WS_CLIENT_DROP_PROTOCOL:
+ case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
+ case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
+ case LWS_CALLBACK_ESTABLISHED_CLIENT_HTTP:
+ case LWS_CALLBACK_CLOSED_CLIENT_HTTP:
+ case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
+ case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
+ case LWS_CALLBACK_WSI_CREATE:
case LWS_CALLBACK_PROTOCOL_INIT:
case LWS_CALLBACK_PROTOCOL_DESTROY:
case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
+ case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
break;
default:
std::cout << "unhandled generic twitch proto callback, reason: " << reason << ", in: " << in << ", len: " << len << std::endl;
#define TEST_WS_HTTPSCONNECTION_H_
#include <cstdio>
+#include <json/json.h>
#include <libwebsockets.h>
#include <map>
#include <string>
class HttpsConnection {
public:
- typedef sys::Promise<HttpsConnection &> PromiseType;
+ typedef sys::Promise<HttpsConnection *> PromiseType;
public:
- HttpsConnection(Context &ctx, const char *method, const char *host, const char *path);
+ HttpsConnection(Context &ctx, const std::string &method, const std::string &host, const std::string &path);
~HttpsConnection() {
}
return status;
}
+ bool IsPositive() const {
+ return status >= 200 && status < 400;
+ }
+
const std::string &GetBody() const {
return in_buffer;
}
+ Json::Value GetBodyJSON() const {
+ Json::Value json;
+ Json::Reader json_reader;
+ json_reader.parse(in_buffer, json);
+ return json;
+ }
+
+public:
int ProtoCallback(lws_callback_reasons reason, void *in, size_t len);
private:
+ std::string method;
+ std::string host;
+ std::string path;
lws_client_connect_info info;
lws *wsi;
#include <json/json.h>
#include <libwebsockets.h>
-#include "../sys/Promise.h"
+#include "../sys/Callbacks.h"
namespace ws {
class PusherConnection {
public:
- typedef sys::Promise<const Json::Value &> PromiseType;
+ typedef sys::Callbacks<const Json::Value &> CallbacksType;
public:
explicit PusherConnection(Context &ctx);
SendMessage("{\"event\":\"pusher:ping\"}");
}
- PromiseType &Subscribe(const std::string &chan) {
+ CallbacksType &Subscribe(const std::string &chan) {
auto it = callbacks.find(chan);
if (it != callbacks.end()) {
return it->second;
Json::Value json;
json_reader.parse(msg, json);
const std::string channel = json["channel"].asString();
- callbacks[channel].Resolve(json);
+ callbacks[channel].FireNothrow(json);
}
private:
Json::Reader json_reader;
Json::FastWriter json_writer;
- std::map<std::string, PromiseType> callbacks;
+ std::map<std::string, CallbacksType> callbacks;
};
#include <algorithm>
#include <cstring>
+#include <functional>
#include <iostream>
#include <map>
#include <string>
#include <libwebsockets.h>
+#include "HttpsConnection.h"
#include "../twitch/IRCMessage.h"
#include "../twitch/LoginToken.h"
+#include "../sys/Callbacks.h"
#include "../sys/Promise.h"
-#include "HttpsConnection.h"
namespace ws {
class TwitchConnection {
public:
- typedef sys::Promise<const twitch::IRCMessage &> PromiseType;
+ typedef sys::Callbacks<const twitch::IRCMessage &> IRCCallbacks;
+ typedef sys::Promise<const Json::Value *, HttpsConnection *> WebPromise;
+ typedef sys::Promise<HttpsConnection *> RequestPromise;
public:
explicit TwitchConnection(Context &ctx);
SendMessage("CAP REQ :twitch.tv/tags twitch.tv/commands");
if (token.HasExpired()) {
token.Refresh(ctx)
- .Then([this](twitch::LoginToken &) -> void {
+ .Then([this](twitch::LoginToken *) -> void {
Login();
})
- .Catch([this](twitch::LoginToken &) -> void {
+ .Catch([this](HttpsConnection *) -> void {
std::cerr << "unable to refresh login token" << std::endl;
});
} else {
}
}
+ bool TokenValid() const {
+ return !token.HasExpired();
+ }
+
+ twitch::LoginToken::PromiseType &RefreshToken() {
+ return token.Refresh(ctx);
+ }
+
void Ping() {
SendMessage("PING localhorst.tv");
}
SendMessage("NICK HorstieBot");
}
- PromiseType &Join(const std::string &chan) {
+ IRCCallbacks &Join(const std::string &chan) {
auto it = callbacks.find(chan);
if (it != callbacks.end()) {
return it->second;
lws_callback_on_writable(wsi);
}
- HttpsConnection &AuthorizedRequest(const char *method, const char *host, const char *path);
+ void WithValidToken(std::function<void(void)> cb, std::function<void(void)> err);
+
+ RequestPromise AuthorizedRequest(const std::string &method, const std::string &host, const std::string &path);
+
+ WebPromise FetchClips(const std::string &from);
+
+ WebPromise Shoutout(const std::string &from, const std::string &to);
public:
int ProtoCallback(lws_callback_reasons reason, void *in, size_t len);
if (msg.params.empty()) return;
auto it = callbacks.find(msg.params[0]);
if (it != callbacks.end()) {
- it->second.Resolve(msg);
+ it->second.FireNothrow(msg);
}
}
+ void SetClosed() {
+ closed = true;
+ }
+
private:
void Connect() {
wsi = lws_client_connect_via_info(&info);
lws *wsi;
bool connected;
bool authenticated;
+ bool closed;
std::string in_buffer;
std::string out_buffer;
- std::map<std::string, PromiseType> callbacks;
+ std::map<std::string, IRCCallbacks> callbacks;
twitch::LoginToken token;
twitch::IRCMessage in_msg;