1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234
//! Read available data from file descriptors without blocking //! //! Useful for nonblocking reads from sockets, named pipes, and child stdout/stderr //! //! # Example //! //! ```no_run //! use std::io::Read; //! use std::process::{Command, Stdio}; //! use std::time::Duration; //! use nonblock::NonBlockingReader; //! //! let mut child = Command::new("some-executable") //! .stdout(Stdio::piped()) //! .spawn().unwrap(); //! let stdout = child.stdout.take().unwrap(); //! let mut noblock_stdout = NonBlockingReader::from_fd(stdout).unwrap(); //! while !noblock_stdout.is_eof() { //! let mut buf = String::new(); //! noblock_stdout.read_available_to_string(&mut buf).unwrap(); //! std::thread::sleep(Duration::from_secs(5)); //! } //! ``` extern crate libc; use std::os::unix::io::{RawFd, AsRawFd}; use std::io::{self, Read, ErrorKind}; use libc::{F_GETFL, F_SETFL, fcntl, O_NONBLOCK}; /// Simple non-blocking wrapper for reader types that implement AsRawFd pub struct NonBlockingReader<R: AsRawFd + Read> { eof: bool, reader: R, } impl<R: AsRawFd + Read> NonBlockingReader<R> { /// Initialize a NonBlockingReader from the reader's file descriptor. /// /// The reader will be managed internally, /// and O_NONBLOCK will be set the file descriptor. pub fn from_fd(reader: R) -> io::Result<NonBlockingReader<R>> { let fd = reader.as_raw_fd(); try!(set_blocking(fd, false)); Ok(NonBlockingReader { reader: reader, eof: false, }) } /// Consume this NonBlockingReader and return the blocking version /// of the internally managed reader. /// /// This will disable O_NONBLOCK on the file descriptor, /// and any data read from the NonBlockingReader before calling `into_blocking` /// will already have been consumed from the reader. pub fn into_blocking(self) -> io::Result<R> { let fd = self.reader.as_raw_fd(); try!(set_blocking(fd, true)); Ok(self.reader) } /// Indicates if EOF has been reached for the reader. /// /// Currently this defaults to false until one of the `read_available` methods is called, /// but this may change in the future if I stumble on a compelling way /// to check for EOF without consuming any of the internal reader. pub fn is_eof(&self) -> bool { self.eof } /// Reads any available data from the reader without blocking, placing them into `buf`. /// /// If successful, this function will return the total number of bytes read. /// 0 bytes read may indicate the EOF has been reached or that reading /// would block because there is not any data immediately available. /// Call `is_eof()` after this method to determine if EOF was reached. /// /// ## Errors /// /// If this function encounters an error of the kind `ErrorKind::Interrupted` /// then the error is ignored and the operation will continue. /// If it encounters `ErrorKind::WouldBlock`, then this function immediately returns /// the total number of bytes read so far. /// /// If any other read error is encountered then this function immediately returns. /// Any bytes which have already been read will be appended to buf. /// /// ## Examples /// ```no_run /// # use std::io::Read; /// # use std::net::TcpStream; /// # use std::time::Duration; /// # use nonblock::NonBlockingReader; /// # /// let client = TcpStream::connect("127.0.0.1:34567").unwrap(); /// let mut noblock_stdout = NonBlockingReader::from_fd(client).unwrap(); /// let mut buf = Vec::new(); /// noblock_stdout.read_available(&mut buf).unwrap(); /// ``` pub fn read_available(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> { let mut buf_len = 0; loop { let mut bytes = [0u8; 1024]; match self.reader.read(&mut bytes[..]) { // EOF Ok(0) => { self.eof = true; break; } // Not EOF, but no more data currently available Err(ref err) if err.kind() == ErrorKind::WouldBlock => { self.eof = false; break; } // Ignore interruptions, continue reading Err(ref err) if err.kind() == ErrorKind::Interrupted => {} // bytes available Ok(len) => { buf_len += len; buf.append(&mut bytes[0..(len)].to_owned()) } // IO Error encountered Err(err) => { return Err(err); } } } Ok(buf_len) } /// Reads any available data from the reader without blocking, placing them into `buf`. /// /// If successful, this function returns the number of bytes which were read and appended to buf. /// /// ## Errors /// /// This function inherits all the possible errors of `read_available()`. /// In the case of errors that occur after successfully reading some data, /// the successfully read data will still be parsed and appended to `buf`. /// /// Additionally, if the read data cannot be parsed as UTF-8, /// then `buf` will remain unmodified, and this method will return `ErrorKind::InvalidData` /// with the `FromUtf8Error` containing any data that was read. /// /// ## Examples /// ```no_run /// # use std::io::Read; /// # use std::process::{Command, Stdio}; /// # use std::time::Duration; /// # use nonblock::NonBlockingReader; /// # /// let mut child = Command::new("foo").stdout(Stdio::piped()).spawn().unwrap(); /// let stdout = child.stdout.take().unwrap(); /// let mut noblock_stdout = NonBlockingReader::from_fd(stdout).unwrap(); /// let mut buf = String::new(); /// noblock_stdout.read_available_to_string(&mut buf).unwrap(); /// ``` /// /// In theory, since this function only reads immediately available data, /// There may not be any guarantee that the data immediately available ends /// on a UTF-8 alignment, so it might be worth a bufferred wrapper /// that manages the captures a final non-UTF-8 character and prepends it to the next call, /// but in practice, this has worked as expected. pub fn read_available_to_string(&mut self, buf: &mut String) -> io::Result<usize> { let mut byte_buf: Vec<u8> = Vec::with_capacity(1024); let res = self.read_available(&mut byte_buf); match String::from_utf8(byte_buf) { Ok(utf8_buf) => { // append any read data before returning the `read_available` result buf.push_str(&utf8_buf); res } Err(err) => { // check for read error before returning the UTF8 Error let _ = try!(res); Err(io::Error::new(ErrorKind::InvalidData, err)) } } } } fn set_blocking(fd: RawFd, blocking: bool) -> io::Result<()> { let flags = unsafe { fcntl(fd, F_GETFL, 0) }; if flags < 0 { return Err(io::Error::last_os_error()); } let flags = if blocking { flags & !O_NONBLOCK } else { flags | O_NONBLOCK }; let res = unsafe { fcntl(fd, F_SETFL, flags) }; if res != 0 { return Err(io::Error::last_os_error()); } Ok(()) } #[cfg(test)] mod tests { use super::NonBlockingReader; use std::sync::mpsc::channel; use std::net::{TcpListener, TcpStream}; use std::thread; use std::io::Write; #[test] fn it_works() { let server = TcpListener::bind("127.0.0.1:34567").unwrap(); let (tx, rx) = channel(); thread::spawn(move || { let (stream, _) = server.accept().unwrap(); tx.send(stream).unwrap(); }); let client = TcpStream::connect("127.0.0.1:34567").unwrap(); let mut stream = rx.recv().unwrap(); let mut nonblocking = NonBlockingReader::from_fd(client).unwrap(); let mut buf = Vec::new(); assert_eq!(nonblocking.read_available(&mut buf).unwrap(), 0); assert_eq!(buf, b""); stream.write(b"foo").unwrap(); assert_eq!(nonblocking.read_available(&mut buf).unwrap(), 3); assert_eq!(buf, b"foo"); } }