From: Daniel Karbach Date: Mon, 21 Nov 2016 15:43:39 +0000 (+0100) Subject: allow individual closing of process streams X-Git-Url: http://git.localhorst.tv/?p=blank.git;a=commitdiff_plain;h=0644360107ca50d681ae8b8cad608c7bc2ec7a40 allow individual closing of process streams --- diff --git a/src/app/Process.hpp b/src/app/Process.hpp index 490cf07..6f5a5ae 100644 --- a/src/app/Process.hpp +++ b/src/app/Process.hpp @@ -28,18 +28,28 @@ public: /// data is taken from given buffer, at most max_len bytes /// @return the number of bytes written std::size_t WriteIn(const void *buffer, std::size_t max_len); + /// close program's input stream + void CloseIn(); + /// read from the process' output stream /// data is stored in the given buffer, at most max_len bytes /// @return the number of bytes read std::size_t ReadOut(void *buffer, std::size_t max_len); + /// close program's output stream + void CloseOut(); + /// read from the process' error stream /// data is stored in the given buffer, at most max_len bytes /// @return the number of bytes read std::size_t ReadErr(void *buffer, std::size_t max_len); + /// close program's output stream + void CloseErr(); /// ask the process nicely to terminate /// (except on win32) void Terminate(); + /// check if the process has terminated + bool Terminated(); /// wait until the process exits and fetch its exit status int Join(); @@ -47,9 +57,6 @@ private: struct Impl; std::unique_ptr impl; - bool joined; - int status; - }; } diff --git a/src/app/proc.cpp b/src/app/proc.cpp index add16d1..876eefc 100644 --- a/src/app/proc.cpp +++ b/src/app/proc.cpp @@ -29,12 +29,25 @@ struct Process::Impl { ~Impl(); size_t WriteIn(const void *buffer, size_t max_len); + void CloseIn(); + size_t ReadOut(void *buffer, size_t max_len); + void CloseOut(); + size_t ReadErr(void *buffer, size_t max_len); + void CloseErr(); void Terminate(); + bool Terminated(); int Join(); + bool joined; + int status; + + bool in_closed; + bool out_closed; + bool err_closed; + #ifdef _WIN32 PROCESS_INFORMATION pi; HANDLE fd_in[2]; @@ -54,9 +67,7 @@ Process::Process( const string &path, const Arguments &args, const Environment &env) -: impl(new Impl(path, args, env)) -, joined(false) -, status(0) { +: impl(new Impl(path, args, env)) { } @@ -69,36 +80,48 @@ size_t Process::WriteIn(const void *buffer, size_t max_len) { return impl->WriteIn(buffer, max_len); } +void Process::CloseIn() { + impl->CloseIn(); +} + size_t Process::ReadOut(void *buffer, size_t max_len) { return impl->ReadOut(buffer, max_len); } +void Process::CloseOut() { + impl->CloseOut(); +} + size_t Process::ReadErr(void *buffer, size_t max_len) { return impl->ReadErr(buffer, max_len); } +void Process::CloseErr() { + impl->CloseErr(); +} + void Process::Terminate() { - if (!joined) { - impl->Terminate(); - } + impl->Terminate(); +} + +bool Process::Terminated() { + return impl->Terminated(); } int Process::Join() { - if (joined) { - return status; - } else { - status = impl->Join(); - joined = true; - return status; - } + return impl->Join(); } Process::Impl::Impl( const string &path_in, const Arguments &args, - const Environment &env -) { + const Environment &env) +: joined(false) +, status(0) +, in_closed(false) +, out_closed(false) +, err_closed(false) { const char *path = path_in.c_str(); char *envp[env.size() + 1]; for (size_t i = 0; i < env.size(); ++i) { @@ -214,7 +237,9 @@ Process::Impl::Impl( #endif Process::Impl::~Impl() { - + CloseIn(); + CloseOut(); + CloseErr(); } size_t Process::Impl::WriteIn(const void *buffer, size_t max_len) { @@ -237,6 +262,18 @@ size_t Process::Impl::WriteIn(const void *buffer, size_t max_len) { #endif } +void Process::Impl::CloseIn() { + if (in_closed) { + return; + } +#ifdef _WIN32 + CloseHandle(fd_in[1]); +#else + close(fd_in[1]); +#endif + in_closed = true; +} + size_t Process::Impl::ReadOut(void *buffer, size_t max_len) { #ifdef _WIN32 DWORD ret; @@ -257,6 +294,18 @@ size_t Process::Impl::ReadOut(void *buffer, size_t max_len) { #endif } +void Process::Impl::CloseOut() { + if (out_closed) { + return; + } +#ifdef _WIN32 + CloseHandle(fd_out[0]); +#else + close(fd_out[0]); +#endif + out_closed = true; +} + size_t Process::Impl::ReadErr(void *buffer, size_t max_len) { #ifdef _WIN32 DWORD ret; @@ -277,7 +326,23 @@ size_t Process::Impl::ReadErr(void *buffer, size_t max_len) { #endif } +void Process::Impl::CloseErr() { + if (err_closed) { + return; + } +#ifdef _WIN32 + CloseHandle(fd_err[0]); +#else + close(fd_err[0]); +#endif + err_closed = true; +} + void Process::Impl::Terminate() { + if (joined) { + // can only terminate once + return; + } #ifdef _WIN32 if (!TerminateProcess(pi.hProcess, -1)) { #else @@ -287,27 +352,64 @@ void Process::Impl::Terminate() { } } -int Process::Impl::Join() { +bool Process::Impl::Terminated() { + if (joined) { + return true; + } #ifdef _WIN32 - CloseHandle(fd_in[1]); - CloseHandle(fd_out[0]); - CloseHandle(fd_err[0]); + DWORD exit_code; + GetExitCodeProcess(pi.hProcess, &exit_code); + if (exit_code == STILL_ACTIVE) { + return false; + } else { + CloseHandle(pi.hProcess); + CloseHandle(pi.hThread); + status = exit_code; + joined = true; + return true; + } +#else + int stat; + int result = waitpid(pid, &stat, WNOHANG); + if (result == -1) { + throw SysError("error polling child process"); + } else if (result == 0) { + return false; + } else if (result == pid) { + // child just exited, reap + if (WIFEXITED(stat)) { + // autonomous termination + status = WEXITSTATUS(stat); + } else if (WIFSIGNALED(stat)) { + // signalled termination + status = WTERMSIG(stat); + } + joined = true; + return true; + } else { + throw runtime_error("bogus return value of waitpid"); + } +#endif +} +int Process::Impl::Join() { + if (joined) { + // can only join once + return status; + } +#ifdef _WIN32 DWORD exit_code; WaitForSingleObject(pi.hProcess, INFINITE); GetExitCodeProcess(pi.hProcess, &exit_code); CloseHandle(pi.hProcess); CloseHandle(pi.hThread); - return exit_code; + status = exit_code; + joined = true; + return status; #else - // close streams before waiting on child termination - close(fd_in[1]); - close(fd_out[0]); - close(fd_err[0]); - while (true) { - int status; - int result = waitpid(pid, &status, 0); + int stat; + int result = waitpid(pid, &stat, 0); if (result == -1) { throw SysError("error waiting on child process"); } @@ -315,13 +417,17 @@ int Process::Impl::Join() { // should in theory only happen with WNOHANG set continue; } - if (WIFEXITED(status)) { + if (WIFEXITED(stat)) { // autonomous termination - return WEXITSTATUS(status); + status = WEXITSTATUS(stat); + joined = true; + return status; } - if (WIFSIGNALED(status)) { + if (WIFSIGNALED(stat)) { // signalled termination - return WTERMSIG(status); + status = WTERMSIG(stat); + joined = true; + return status; } // otherwise, child probably signalled stop/continue, which we // don't care about (please don't tell youth welfare), so try again diff --git a/tst/app/ProcessTest.cpp b/tst/app/ProcessTest.cpp index db181aa..a5d2cc5 100644 --- a/tst/app/ProcessTest.cpp +++ b/tst/app/ProcessTest.cpp @@ -95,6 +95,8 @@ void ProcessTest::testStream() { CPPUNIT_ASSERT_EQUAL_MESSAGE( "unexpected length of input to cat", test_input.size(), len); + // close input stream so cat knows we're done + proc.CloseIn(); char buffer[expected_output.length() + 1]; len = proc.ReadOut(buffer, sizeof(buffer)); diff --git a/tst/integration/ServerTest.cpp b/tst/integration/ServerTest.cpp index 9cee9ee..c5ca60e 100644 --- a/tst/integration/ServerTest.cpp +++ b/tst/integration/ServerTest.cpp @@ -19,6 +19,10 @@ void ServerTest::tearDown() { void ServerTest::testStartup() { TestInstance server({ "--server" }, true); + server.AssertRunning(); + server.Terminate(); + server.AssertExitStatus(0); + server.AssertNoError(); } } diff --git a/tst/integration/TestInstance.cpp b/tst/integration/TestInstance.cpp index f9bc727..495ba6c 100644 --- a/tst/integration/TestInstance.cpp +++ b/tst/integration/TestInstance.cpp @@ -51,10 +51,14 @@ TestInstance::~TestInstance() { void TestInstance::WriteInput(const string &data) { + AssertRunning(); const char *i = data.c_str(); const char *end = i + data.length(); while (i != end) { size_t len = proc.WriteIn(i, end - i); + if (len == 0) { + throw runtime_error("failed write to child process' stdin"); + } i += len; } } @@ -62,7 +66,11 @@ void TestInstance::WriteInput(const string &data) { void TestInstance::ReadOutputLine(string &line) { while (!out_buf.Extract(line)) { // buffer exhausted, fetch more data - out_buf.Update(proc.ReadOut(out_buf.WriteHead(), out_buf.Remain())); + int len = proc.ReadOut(out_buf.WriteHead(), out_buf.Remain()); + if (len == 0) { + throw runtime_error("failed read from child process' stdout"); + } + out_buf.Update(len); } } @@ -84,10 +92,34 @@ void TestInstance::WaitOutputLine(const string &expected) { } } +void TestInstance::ExhaustOutput(string &output) { + while (!out_buf.Extract(output)) { + // buffer exhausted, fetch more data + int len = proc.ReadOut(out_buf.WriteHead(), out_buf.Remain()); + if (len == 0) { + return; + } + out_buf.Update(len); + } +} + +void TestInstance::AssertNoOutput() { + string output; + ExhaustOutput(output); + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "test instanced produced unexpected output", + string(""), output); +} + + void TestInstance::ReadErrorLine(string &line) { while (!err_buf.Extract(line)) { // buffer exhausted, fetch more data - err_buf.Update(proc.ReadErr(err_buf.WriteHead(), err_buf.Remain())); + int len = proc.ReadErr(err_buf.WriteHead(), err_buf.Remain()); + if (len == 0) { + throw runtime_error("failed read from child process' stderr"); + } + err_buf.Update(len); } } @@ -109,10 +141,45 @@ void TestInstance::WaitErrorLine(const string &expected) { } } +void TestInstance::ExhaustError(string &error) { + while (!err_buf.Extract(error)) { + // buffer exhausted, fetch more data + int len = proc.ReadErr(err_buf.WriteHead(), err_buf.Remain()); + if (len == 0) { + return; + } + err_buf.Update(len); + } +} + +void TestInstance::AssertNoError() { + string error; + ExhaustError(error); + CPPUNIT_ASSERT_EQUAL_MESSAGE( + "test instanced produced unexpected error output", + string(""), error); +} + + +void TestInstance::Terminate() { + proc.Terminate(); +} + +void TestInstance::AssertRunning() { + CPPUNIT_ASSERT_MESSAGE( + "test instance terminated unexpectedly", + !proc.Terminated()); +} + +void TestInstance::AssertTerminated() { + CPPUNIT_ASSERT_MESSAGE( + "test instance did not terminate as expected", + proc.Terminated()); +} void TestInstance::AssertExitStatus(int expected) { CPPUNIT_ASSERT_EQUAL_MESSAGE( - "unexpected line in stderr", + "unexpected exit status from child program", expected, proc.Join()); } diff --git a/tst/integration/TestInstance.hpp b/tst/integration/TestInstance.hpp index 3bd23d4..354a668 100644 --- a/tst/integration/TestInstance.hpp +++ b/tst/integration/TestInstance.hpp @@ -26,22 +26,36 @@ public: /// sure to include a newline character. void WriteInput(const std::string &data); - /// read next line from programs stdout + /// read next line from program's stdout void ReadOutputLine(std::string &line); /// assert that the next line the program writes to stdout will /// be the given one (without a trailing newline character) void AssertOutputLine(const std::string &line); /// wait until program writes given line to stdout void WaitOutputLine(const std::string &line); + /// read from program's stdout until EOF + void ExhaustOutput(std::string &output); + /// assert that the program produces no more output + void AssertNoOutput(); - /// read next line from programs stderr + /// read next line from program's stderr void ReadErrorLine(std::string &line); /// assert that the next line the program writes to stderr will /// be the given one (without a trailing newline character) void AssertErrorLine(const std::string &line); /// wait until program writes given line to stderr void WaitErrorLine(const std::string &line); - + /// read from program's stdout until EOF + void ExhaustError(std::string &error); + /// assert that the program produces no more output on stderr + void AssertNoError(); + + /// send termination signal + void Terminate(); + /// assert that the program has not exited + void AssertRunning(); + /// assert that the program has exited + void AssertTerminated(); /// make sure the process terminated with given status void AssertExitStatus(int expected);