cgv
Loading...
Searching...
No Matches
pipe_thread.cxx
1#include "pipe.hpp"
2#include "pipe_thread.h"
3#include "cmdline_tools.h"
4
5#include <cstring>
6#include <mutex>
7
8namespace cgv {
9 namespace os {
10
11queued_output_thread::queued_output_thread(bool _is_binary, unsigned _ms_to_wait)
12{
13 is_binary = _is_binary;
14 ms_to_wait = _ms_to_wait;
15}
17{
19 return;
20 m.lock();
21 connected = true;
22 m.unlock();
23 while (!have_stop_request()) {
24 // check for queued block
25 std::pair<char*, size_t> block = { 0,0 };
26 m.lock();
27 if (!blocks.empty()) {
28 block = blocks.front();
29 blocks.pop_front();
30 }
31 m.unlock();
32 // if this is available,
33 if (block.first) {
34 // send it to the pipe and delete block
35 write_block_to_pipe(block.first, block.second);
36 delete[] block.first;
37 }
38 else {
39 // otherwise check for termination
40 bool all_sent;
41 m.lock();
42 all_sent = all_data_sent;
43 m.unlock();
44 if (all_sent)
45 break;
47 }
48 }
49 // in case of stop request or done, close pipe
50 close();
51}
53{
54 bool result;
55 m.lock();
56 result = connected;
57 m.unlock();
58 return result;
59}
60bool queued_output_thread::send_block(const char* data, size_t count)
61{
62 bool all_sent;
63 m.lock();
64 all_sent = all_data_sent;
65 m.unlock();
66 if (all_sent)
67 return false;
68 bool out_of_memory = false;
69 char* block;
70 try {
71 block = new char[count];
72 }
73 catch (const std::bad_alloc&) {
74 out_of_memory = true;
75 }
76 if (out_of_memory)
77 return false;
78 std::copy(data, data + count, block);
79 m.lock();
80 blocks.push_back({ block, count });
81 m.unlock();
82 return true;
83}
85{
86 size_t nr_blocks;
87 m.lock();
88 nr_blocks = blocks.size();
89 m.unlock();
90 return nr_blocks;
91}
93{
94 size_t nr_bytes = 0;
95 m.lock();
96 for (auto b : blocks)
97 nr_bytes += b.second;
98 m.unlock();
99 return nr_bytes;
100}
102{
103 m.lock();
104 all_data_sent = true;
105 m.unlock();
106}
107
108named_pipe_output_thread::named_pipe_output_thread(const std::string& _pipe_name, bool _is_binary, unsigned _ms_to_wait)
109 : queued_output_thread(_is_binary, _ms_to_wait)
110{
111 pipe_name = _pipe_name;
112}
114{
115 return nes::pipe_root + pipe_name;
116}
118{
119 auto mode = is_binary ? (std::ios_base::out | std::ios_base::binary) : std::ios_base::out;
120 pipe_ptr = new nes::pipe_ostream(pipe_name, mode);
121 if (pipe_ptr->fail()) {
122 delete pipe_ptr;
123 pipe_ptr = 0;
124 return false;
125 }
126 return true;
127}
128void named_pipe_output_thread::write_block_to_pipe(const char* data, size_t count)
129{
130 pipe_ptr->write(data, count);
131}
133{
134 pipe_ptr->close();
135 delete pipe_ptr;
136 pipe_ptr = 0;
137}
138
139pipe_output_thread::pipe_output_thread(const std::string& _cmd, bool _is_binary, unsigned _ms_to_wait)
140 : queued_output_thread(_is_binary, _ms_to_wait)
141{
142 cmd = _cmd;
143}
145{
146 fp = cgv::os::open_system_input(cmd, is_binary);
147 return fp != 0;
148}
149void pipe_output_thread::write_block_to_pipe(const char* data, size_t count)
150{
151 fwrite(data, 1, count, fp);
152}
154{
155 result = cgv::os::close_system_input(fp);
156}
158{
159 return result;
160}
161
162queued_input_thread::queued_input_thread(bool _is_binary, size_t _package_size, size_t _packeges_per_block,
163 unsigned _ms_to_wait)
164{
165 is_binary = _is_binary;
166 ms_to_wait = _ms_to_wait;
167 package_size = _package_size;
168 packages_per_block = _packeges_per_block;
169 package_index = 0;
170
172}
173
175{
176 if (!connect_to_source())
177 return;
178
179 while (!have_stop_request()) {
180 // Allocate buffer for reading
181
182 char* buffer = new char[package_size];
183 size_t bytes_read = read_package_from_pipe(buffer,package_size);
184
185 if (bytes_read > 0) {
186 // Add block to queue if data was read
187 mutex_packages.lock();
188 memcpy(packages + package_size * package_index, buffer, bytes_read);
190 mutex_packages.unlock();
193 }
194 delete[] buffer;
195 }
196 else {
197 // No data, check for termination
198 bool done;
199 mutex_packages.lock();
201 mutex_packages.unlock();
202 delete[] buffer;
203 if (done)
204 break;
206 }
207 }
208
209 // Clean up and close source
210 close();
211}
212
214
215 char* data = new char[package_size * packages_per_block];
216
217 mutex_packages.lock();
218
220 package_index = 0;
221
222 mutex_packages.unlock();
223
224 mutex_data_blocks.lock();
225 data_blocks.emplace_back(std::vector<char>(data, data + package_size * packages_per_block));
226 mutex_data_blocks.unlock();
227
228}
229
231{
232 mutex_data_blocks.lock();
233 if (data_blocks.empty()) {
234 mutex_data_blocks.unlock();
235 return false;
236 }
237
238 auto block = std::move(data_blocks.front());
239 data_blocks.pop_front();
240 mutex_data_blocks.unlock();
241 memcpy(buffer, block.data(),package_size * packages_per_block);
242
243 return true;
244}
246{
247 size_t nr_blocks;
248 mutex_data_blocks.lock();
249 nr_blocks = data_blocks.size();
250 mutex_data_blocks.unlock();
251 return nr_blocks;
252}
254{
255 mutex_packages.lock();
256 all_data_received = true;
257 mutex_packages.unlock();
258}
260{
261 auto mode = is_binary ? (std::ios_base::in | std::ios_base::binary) : std::ios_base::in;
262 pipe_ptr = new nes::pipe_istream(pipe_name, mode);
263 if (pipe_ptr->fail()) {
264 delete pipe_ptr;
265 pipe_ptr = 0;
266 return false;
267 }
268 return true;
269}
270size_t named_pipe_input_thread::read_package_from_pipe(char* buffer,size_t package_size) {
271 if (!pipe_ptr->read(buffer, package_size)) {
272 mutex_packages.lock();
273 all_data_received = true;
274 mutex_packages.unlock();
275 return 0;
276 }
277 return pipe_ptr->gcount();
278}
280{
281 if (pipe_ptr) {
282 pipe_ptr->close();
283 delete pipe_ptr;
284 pipe_ptr = 0;
285 }
286}
287named_pipe_input_thread::named_pipe_input_thread(const std::string& _pipe_name, bool is_binary,
288 size_t _package_size, size_t _packeges_per_block, unsigned _ms_to_wait)
289 : queued_input_thread(is_binary,_package_size,_packeges_per_block, _ms_to_wait)
290{
291 pipe_name = _pipe_name;
292}
293std::string named_pipe_input_thread::get_pipe_path() const
294{
295 return nes::pipe_root + pipe_name;
296}
298 fp = cgv::os::open_system_output(cmd, is_binary);
299 return fp != 0;
300}
301size_t pipe_input_thread::read_package_from_pipe(char* buffer, size_t package_size)
302{
303 if (!fread(buffer, 1, package_size, fp)) {
304 mutex_packages.lock();
305 all_data_received = true;
306 mutex_packages.unlock();
307 return 0;
308 }
309 return package_size;
310}
312{
313 result = cgv::os::close_system_input(fp);
314}
315pipe_input_thread::pipe_input_thread(const std::string& _cmd, bool is_binary, size_t _package_size,size_t _packages_per_block, unsigned _ms_to_wait)
316 : queued_input_thread(is_binary, _package_size, _packages_per_block, _ms_to_wait)
317{
318 cmd = _cmd;
319}
320int pipe_input_thread::get_result() const {
321 return result;
322}
323 } // namespace os
324}
325
bool connect_to_source() override
to be implemented in derived classes
size_t read_package_from_pipe(char *buffer, size_t package_size) override
to be implemented in derived classes
void close() override
to be implemented in derived classes
void write_block_to_pipe(const char *data, size_t count)
write block to named pipe
std::string get_pipe_path() const
return path of pipe that can be used in command line arguments to child/client processes
bool connect_to_child_process()
creates pipe and waits for connection
std::string pipe_name
based name of the
Definition pipe_thread.h:63
named_pipe_output_thread(const std::string &_pipe_name, bool is_binary=true, unsigned _ms_to_wait=20)
construct pipe output thread from pipe name, whether to use binary mode and wait time in ms used when...
void close()
closes named pipe
nes::basic_pipe_ostream< char, std::char_traits< char > > * pipe_ptr
pointer to the named pipe output stream
Definition pipe_thread.h:65
size_t read_package_from_pipe(char *buffer, size_t package_size) override
to be implemented in derived classes
bool connect_to_source() override
to be implemented in derived classes
void close() override
to be implemented in derived classes
void write_block_to_pipe(const char *data, size_t count)
write block to stdin pipe
int result
result value of child process
Definition pipe_thread.h:90
pipe_output_thread(const std::string &_cmd, bool is_binary=true, unsigned _ms_to_wait=20)
construct pipe output thread from system command, whether to use binary mode and wait time in ms used...
void close()
closes stdin pipe and stores result value
FILE * fp
file handle of stdin of child process
Definition pipe_thread.h:88
int get_result() const
return the result value returned from child process which is available only after termination of thre...
std::string cmd
system command to be executed
Definition pipe_thread.h:86
bool connect_to_child_process()
starts child process and connects to its stdin
Base class for system command output pipe or named pipe threads including a queue of data blocks,...
queued_input_thread(bool _is_binary, size_t _package_size, size_t _packeges_per_block, unsigned _ms_to_wait)
construct queued input thread with binary mode flag and wait time in ms used when pipe is empty
size_t package_size
size of an indiviual package
std::deque< std::vector< char > > data_blocks
deque to store data blocks that are formed by packages
size_t packages_per_block
amount of packeges needed to form one data block
bool pop_data_block(char *buffer)
if done() had not been called, read a data block from the queue; can fail if no data or done() was ca...
virtual void close()=0
to be implemented in derived classes
virtual size_t read_package_from_pipe(char *buffer, size_t package_size)=0
to be implemented in derived classes
virtual bool connect_to_source()=0
to be implemented in derived classes
char * packages
allocated memory for smaller data packages
size_t get_nr_blocks() const
returns the number of blocks in the queue of not yet read data
void run()
continuously read from pipe and store in queue; if empty, wait in intervals of ms_to_wait millisecond...
size_t package_index
index of the next package to be read
std::mutex mutex_packages
mutex used to protect access to packages
bool is_binary
whether binary mode should be used
std::mutex mutex_data_blocks
mutex used to protect access to data_blocks
unsigned ms_to_wait
time in milliseconds to wait while pipe is empty
void done()
call this to indicate no more data will come
bool all_data_received
flag to mark end of data stream
virtual void move_packages_to_data_blocks()
combine the packages to a data block and push it to the queue
base class for system command input pipe or named pipe threads including a queue of data blocks and a...
Definition pipe_thread.h:20
bool connected
flag that tells whether the pipe has been connected to from the other side
Definition pipe_thread.h:27
size_t get_nr_blocks() const
returns the number of blocks in the queue of not yet written data
virtual void close()=0
to be implemented in derived classes
unsigned ms_to_wait
time in miliseconds to wait while queue is empty
Definition pipe_thread.h:33
bool send_block(const char *data, size_t count)
if done() had not been called, insert a data block into the queue; can fail if done() or out of memor...
virtual void write_block_to_pipe(const char *data, size_t count)=0
to be implemented in derived classes
std::deque< std::pair< char *, size_t > > blocks
deque used to queue the data blocks that should be written to the pipe by the thread
Definition pipe_thread.h:31
void run()
connect to child process and continuously write queue content to pipe; if empty wait in intervals of ...
bool is_binary
whether binary mode should be used
Definition pipe_thread.h:25
void done()
call this to announce the all data has been sent
virtual bool connect_to_child_process()=0
to be implemented in derived classes
queued_output_thread(bool is_binary=true, unsigned _ms_to_wait=20)
construct queued output thread from flag, whether to use binary mode and wait time in ms used when qu...
cgv::os::mutex m
mutex used to protect access to blocks
Definition pipe_thread.h:29
size_t get_nr_bytes() const
returns the number of bytes in the queue of not yet written data, what is more time consuming than ge...
bool has_connection() const
returns true as soon as child process has connected to pipe
static void wait(unsigned millisec)
wait the given number of milliseconds
bool have_stop_request()
check if there is a stop request
Definition thread.h:79
the cgv namespace
Definition print.h:11
void unlock()
unlock the mutex
void lock()
lock the mutex (if the mutex is already locked, the caller is blocked until the mutex becomes availab...