From: Daniel Karbach Date: Fri, 25 Nov 2016 10:54:11 +0000 (+0100) Subject: timeouts reading from spawned processes X-Git-Url: https://git.localhorst.tv/?a=commitdiff_plain;h=7f829070c9a5e4e036b483863e5ee75a3a824c38;p=blank.git timeouts reading from spawned processes --- diff --git a/src/app/Process.hpp b/src/app/Process.hpp index 732fd2f..0e84ccd 100644 --- a/src/app/Process.hpp +++ b/src/app/Process.hpp @@ -38,15 +38,19 @@ public: /// read from the process' output stream /// data is stored in the given buffer, at most max_len bytes + /// timeout is the number of milliseconds to wait for the process + /// to produce output, -1 for indefinite /// @return the number of bytes read - std::size_t ReadOut(void *buffer, std::size_t max_len); + std::size_t ReadOut(void *buffer, std::size_t max_len, int timeout); /// close program's output stream void CloseOut(); /// read from the process' error stream /// data is stored in the given buffer, at most max_len bytes + /// timeout is the number of milliseconds to wait for the process + /// to produce output, -1 for indefinite /// @return the number of bytes read - std::size_t ReadErr(void *buffer, std::size_t max_len); + std::size_t ReadErr(void *buffer, std::size_t max_len, int timeout); /// close program's output stream void CloseErr(); diff --git a/src/app/proc.cpp b/src/app/proc.cpp index dff8dee..1373a91 100644 --- a/src/app/proc.cpp +++ b/src/app/proc.cpp @@ -9,6 +9,7 @@ # include # include # include +# include # include #endif @@ -36,10 +37,10 @@ struct Process::Impl { size_t WriteIn(const void *buffer, size_t max_len); void CloseIn(); - size_t ReadOut(void *buffer, size_t max_len); + size_t ReadOut(void *buffer, size_t max_len, int timeout); void CloseOut(); - size_t ReadErr(void *buffer, size_t max_len); + size_t ReadErr(void *buffer, size_t max_len, int timeout); void CloseErr(); void Terminate(); @@ -96,16 +97,16 @@ void Process::CloseIn() { impl->CloseIn(); } -size_t Process::ReadOut(void *buffer, size_t max_len) { - return impl->ReadOut(buffer, max_len); +size_t Process::ReadOut(void *buffer, size_t max_len, int timeout) { + return impl->ReadOut(buffer, max_len, timeout); } void Process::CloseOut() { impl->CloseOut(); } -size_t Process::ReadErr(void *buffer, size_t max_len) { - return impl->ReadErr(buffer, max_len); +size_t Process::ReadErr(void *buffer, size_t max_len, int timeout) { + return impl->ReadErr(buffer, max_len, timeout); } void Process::CloseErr() { @@ -297,14 +298,35 @@ void Process::Impl::CloseIn() { in_closed = true; } -size_t Process::Impl::ReadOut(void *buffer, size_t max_len) { +size_t Process::Impl::ReadOut(void *buffer, size_t max_len, int timeout) { #ifdef _WIN32 + // TODO: timeout implementation for windows child process I/O DWORD ret; if (!ReadFile(fd_out[0], buffer, max_len, &ret, nullptr)) { throw runtime_error("failed to read from child process' output stream"); } return ret; #else + if (timeout >= 0) { + fd_set read_set; + fd_set error_set; + FD_ZERO(&read_set); + FD_ZERO(&error_set); + FD_SET(fd_out[0], &read_set); + FD_SET(fd_out[0], &error_set); + timeval timer; + timer.tv_sec = timeout / 1000; + timer.tv_usec = (timeout % 1000) * 1000; + if (select(fd_out[0] + 1, &read_set, nullptr, &error_set, &timer) == -1) { + throw SysError("error waiting on child process' output stream"); + } + if (FD_ISSET(fd_out[0], &error_set)) { + throw runtime_error("error condition on child process' output stream"); + } + if (!FD_ISSET(fd_out[0], &read_set)) { + throw runtime_error("timeout while waiting on child process' output stream"); + } + } int ret = read(fd_out[0], buffer, max_len); if (ret < 0) { if (errno == EAGAIN) { @@ -329,14 +351,35 @@ void Process::Impl::CloseOut() { out_closed = true; } -size_t Process::Impl::ReadErr(void *buffer, size_t max_len) { +size_t Process::Impl::ReadErr(void *buffer, size_t max_len, int timeout) { #ifdef _WIN32 + // TODO: timeout implementation for windows child process I/O DWORD ret; if (!ReadFile(fd_err[0], buffer, max_len, &ret, nullptr)) { throw runtime_error("failed to read from child process' error stream"); } return ret; #else + if (timeout >= 0) { + fd_set read_set; + fd_set error_set; + FD_ZERO(&read_set); + FD_ZERO(&error_set); + FD_SET(fd_err[0], &read_set); + FD_SET(fd_err[0], &error_set); + timeval timer; + timer.tv_sec = timeout / 1000; + timer.tv_usec = (timeout % 1000) * 1000; + if (select(fd_err[0] + 1, &read_set, nullptr, &error_set, &timer) == -1) { + throw SysError("error waiting on child process' error stream"); + } + if (FD_ISSET(fd_err[0], &error_set)) { + throw runtime_error("error condition on child process' error stream"); + } + if (!FD_ISSET(fd_err[0], &read_set)) { + throw runtime_error("timeout while waiting on child process' error stream"); + } + } int ret = read(fd_err[0], buffer, max_len); if (ret < 0) { if (errno == EAGAIN) { diff --git a/tst/app/ProcessTest.cpp b/tst/app/ProcessTest.cpp index a16bfb3..561a3dd 100644 --- a/tst/app/ProcessTest.cpp +++ b/tst/app/ProcessTest.cpp @@ -53,7 +53,7 @@ void ProcessTest::testStream() { const string expected_output("hello, world\n"); Process proc("/usr/bin/env", { "env", "echo", test_input.c_str() }); char buffer[expected_output.length() + 1]; - size_t len = proc.ReadOut(buffer, sizeof(buffer)); + size_t len = proc.ReadOut(buffer, sizeof(buffer), 1000); const string output(buffer, len); int status = proc.Join(); CPPUNIT_ASSERT_EQUAL_MESSAGE( @@ -72,7 +72,7 @@ void ProcessTest::testStream() { const string expected_output("hello, error\n"); Process proc("/usr/bin/env", { "env", "sh", "-c", "echo $1 >&2", "echo", test_input.c_str() }, { }); char buffer[expected_output.length() + 1]; - size_t len = proc.ReadErr(buffer, sizeof(buffer)); + size_t len = proc.ReadErr(buffer, sizeof(buffer), 1000); const string output(buffer, len); int status = proc.Join(); CPPUNIT_ASSERT_EQUAL_MESSAGE( @@ -99,7 +99,7 @@ void ProcessTest::testStream() { proc.CloseIn(); char buffer[expected_output.length() + 1]; - len = proc.ReadOut(buffer, sizeof(buffer)); + len = proc.ReadOut(buffer, sizeof(buffer), 1000); const string output(buffer, len); int status = proc.Join(); CPPUNIT_ASSERT_EQUAL_MESSAGE( @@ -126,7 +126,7 @@ void ProcessTest::testEnv() { const string expected_output("Hello, environment\n"); Process proc("/usr/bin/env", { "env", "sh", "-c", "echo $BLANK_ENV_TEST" }, { "BLANK_ENV_TEST=" + test_input }); char buffer[expected_output.length() + 1]; - size_t len = proc.ReadOut(buffer, sizeof(buffer)); + size_t len = proc.ReadOut(buffer, sizeof(buffer), 1000); const string output(buffer, len); int status = proc.Join(); CPPUNIT_ASSERT_EQUAL_MESSAGE( @@ -143,5 +143,21 @@ void ProcessTest::testEnv() { #endif } +void ProcessTest::testTimeout() { +#ifdef __WIN32 +# error "TODO: implement Process tests for windows" +#else + Process proc("/usr/bin/env", { "env", "cat" }); + char buffer; + CPPUNIT_ASSERT_THROW_MESSAGE( + "read timeout on child process' stdout should throw", + proc.ReadOut(&buffer, 1, 1), std::runtime_error); + CPPUNIT_ASSERT_THROW_MESSAGE( + "read timeout on child process' stderr should throw", + proc.ReadErr(&buffer, 1, 1), std::runtime_error); + proc.Terminate(); +#endif +} + } } diff --git a/tst/app/ProcessTest.hpp b/tst/app/ProcessTest.hpp index 95217e3..87ebf44 100644 --- a/tst/app/ProcessTest.hpp +++ b/tst/app/ProcessTest.hpp @@ -15,6 +15,7 @@ CPPUNIT_TEST_SUITE(ProcessTest); CPPUNIT_TEST(testExit); CPPUNIT_TEST(testStream); CPPUNIT_TEST(testEnv); +CPPUNIT_TEST(testTimeout); CPPUNIT_TEST_SUITE_END(); @@ -25,6 +26,7 @@ public: void testExit(); void testStream(); void testEnv(); + void testTimeout(); }; diff --git a/tst/integration/ServerTest.cpp b/tst/integration/ServerTest.cpp index c5ca60e..8fe7788 100644 --- a/tst/integration/ServerTest.cpp +++ b/tst/integration/ServerTest.cpp @@ -9,20 +9,20 @@ namespace blank { namespace test { void ServerTest::setUp() { - + instance.reset(new TestInstance({ "--server" }, true)); + instance->AssertRunning(); } void ServerTest::tearDown() { - + std::unique_ptr inst(std::move(instance)); + inst->Terminate(); + inst->AssertExitStatus(0); + inst->AssertNoError(); } void ServerTest::testStartup() { - TestInstance server({ "--server" }, true); - server.AssertRunning(); - server.Terminate(); - server.AssertExitStatus(0); - server.AssertNoError(); + // setUp and testDown do all the tests } } diff --git a/tst/integration/ServerTest.hpp b/tst/integration/ServerTest.hpp index bc8336d..715a4a2 100644 --- a/tst/integration/ServerTest.hpp +++ b/tst/integration/ServerTest.hpp @@ -1,12 +1,15 @@ #ifndef BLANK_TEST_INTEGRATION_SERVERTEST_HPP_ #define BLANK_TEST_INTEGRATION_SERVERTEST_HPP_ +#include #include namespace blank { namespace test { +class TestInstance; + class ServerTest : public CppUnit::TestFixture { @@ -22,6 +25,9 @@ public: void testStartup(); +private: + std::unique_ptr instance; + }; } diff --git a/tst/integration/StandaloneTest.cpp b/tst/integration/StandaloneTest.cpp index 40930a7..2407775 100644 --- a/tst/integration/StandaloneTest.cpp +++ b/tst/integration/StandaloneTest.cpp @@ -2,6 +2,7 @@ #include "TestInstance.hpp" + CPPUNIT_TEST_SUITE_NAMED_REGISTRATION(blank::test::StandaloneTest, "integration"); @@ -9,33 +10,20 @@ namespace blank { namespace test { void StandaloneTest::setUp() { - + instance.reset(new TestInstance({ "--no-vsync" })); + instance->AssertRunning(); } void StandaloneTest::tearDown() { - + std::unique_ptr inst(std::move(instance)); + inst->Terminate(); + inst->AssertExitStatus(0); + inst->AssertNoError(); } void StandaloneTest::testStartup() { - TestInstance standalone({ "--no-vsync" }); - standalone.AssertRunning(); - try { - standalone.AssertOutputLine("chunk preloading complete"); - standalone.Terminate(); - } catch (...) { - try { - standalone.Terminate(); - } catch (...) { } - std::string output; - standalone.ExhaustError(output); - CPPUNIT_ASSERT_EQUAL_MESSAGE( - "process stderr", - std::string(""), output); - CPPUNIT_FAIL("exception in runtime"); - } - standalone.AssertExitStatus(0); - standalone.AssertNoError(); + instance->AssertOutputLine("chunk preloading complete"); } } diff --git a/tst/integration/StandaloneTest.hpp b/tst/integration/StandaloneTest.hpp index 99c5ad9..bcba75f 100644 --- a/tst/integration/StandaloneTest.hpp +++ b/tst/integration/StandaloneTest.hpp @@ -1,12 +1,15 @@ #ifndef BLANK_TEST_INTEGRATION_STANDALONETEST_HPP_ #define BLANK_TEST_INTEGRATION_STANDALONETEST_HPP_ +#include #include namespace blank { namespace test { +class TestInstance; + class StandaloneTest : public CppUnit::TestFixture { @@ -22,6 +25,9 @@ public: void testStartup(); +private: + std::unique_ptr instance; + }; } diff --git a/tst/integration/TestInstance.cpp b/tst/integration/TestInstance.cpp index 31a5aab..71bc194 100644 --- a/tst/integration/TestInstance.cpp +++ b/tst/integration/TestInstance.cpp @@ -35,7 +35,6 @@ TestInstance::TestInstance(const Process::Arguments &args, bool cmd) , cmd_buf() { if (cmd) { // wait for command service startup - // TODO: timeouts for reading from process WaitOutputLine("listening on TCP port 12354"); // connect to command service conn = tcp::Socket("localhost", 12354); @@ -63,7 +62,7 @@ void TestInstance::WriteInput(const string &data) { void TestInstance::ReadOutputLine(string &line) { while (!out_buf.Extract(line)) { // buffer exhausted, fetch more data - int len = proc.ReadOut(out_buf.WriteHead(), out_buf.Remain()); + int len = proc.ReadOut(out_buf.WriteHead(), out_buf.Remain(), 5000); if (len == 0) { throw runtime_error("failed read from child process' stdout"); } @@ -92,7 +91,7 @@ void TestInstance::WaitOutputLine(const string &expected) { void TestInstance::ExhaustOutput(string &output) { while (!out_buf.Extract(output)) { // buffer exhausted, fetch more data - int len = proc.ReadOut(out_buf.WriteHead(), out_buf.Remain()); + int len = proc.ReadOut(out_buf.WriteHead(), out_buf.Remain(), 5000); if (len == 0) { return; } @@ -112,7 +111,7 @@ void TestInstance::AssertNoOutput() { void TestInstance::ReadErrorLine(string &line) { while (!err_buf.Extract(line)) { // buffer exhausted, fetch more data - int len = proc.ReadErr(err_buf.WriteHead(), err_buf.Remain()); + int len = proc.ReadErr(err_buf.WriteHead(), err_buf.Remain(), 5000); if (len == 0) { throw runtime_error("failed read from child process' stderr"); } @@ -141,7 +140,7 @@ void TestInstance::WaitErrorLine(const string &expected) { void TestInstance::ExhaustError(string &error) { while (!err_buf.Extract(error)) { // buffer exhausted, fetch more data - int len = proc.ReadErr(err_buf.WriteHead(), err_buf.Remain()); + int len = proc.ReadErr(err_buf.WriteHead(), err_buf.Remain(), 5000); if (len == 0) { return; }