#pragma once #include <cstddef> #include <memory> #include <chrono> #include <string> #include <system_error> #include <ostream> #include <sys/types.h> #ifdef _WIN32 #include <winsock2.h> #include <ws2tcpip.h> #include <stdio.h> #include <Ws2def.h> #pragma comment(lib, "Ws2_32.lib") #else #include <sys/socket.h> #include <arpa/inet.h> #include <netdb.h> #endif #include <realm/status.hpp> #include <realm/util/features.h> #include <realm/util/assert.hpp> #include <realm/util/backtrace.hpp> #include <realm/util/basic_system_errors.hpp> #include <realm/util/bind_ptr.hpp> #include <realm/util/buffer.hpp> #include <realm/util/misc_ext_errors.hpp> #include <realm/util/scope_exit.hpp> // Linux epoll #if defined(REALM_USE_EPOLL) && !REALM_ANDROID #define REALM_NETWORK_USE_EPOLL 1 #else #define REALM_NETWORK_USE_EPOLL 0 #endif // FreeBSD Kqueue. // // Available on Mac OS X, FreeBSD, NetBSD, OpenBSD #if (defined(__MACH__) && defined(__APPLE__)) || defined(__FreeBSD__) || defined(__NetBSD__) || defined(__OpenBSD__) #if !defined(REALM_HAVE_KQUEUE) #if !defined(REALM_DISABLE_UTIL_NETWORK_KQUEUE) #define REALM_HAVE_KQUEUE 1 #endif #endif #endif #if !defined(REALM_HAVE_KQUEUE) #define REALM_HAVE_KQUEUE 0 #endif // FIXME: Unfinished business around `Address::m_ip_v6_scope_id`. namespace realm::sync::network { /// \brief TCP/IP networking API. /// /// The design of this networking API is heavily inspired by the ASIO C++ /// library (http://think-async.com). /// /// /// ### Thread safety /// /// A *service context* is a set of objects consisting of an instance of /// Service, and all the objects that are associated with that instance (\ref /// Resolver, \ref Socket`, \ref Acceptor`, \ref DeadlineTimer, and /// \ref ssl::Stream). /// /// In general, it is unsafe for two threads to call functions on the same /// object, or on different objects in the same service context. This also /// applies to destructors. Notable exceptions are the fully thread-safe /// functions, such as Service::post(), Service::stop(), and Service::reset(). /// /// On the other hand, it is always safe for two threads to call functions on /// objects belonging to different service contexts. /// /// One implication of these rules is that at most one thread must execute run() /// at any given time, and if one thread is executing run(), then no other /// thread is allowed to access objects in the same service context (with the /// mentioned exceptions). /// /// Unless otherwise specified, free-standing objects, such as \ref /// StreamProtocol, \ref Address, \ref Endpoint, and \ref Endpoint::List are /// fully thread-safe as long as they are not mutated. If one thread is mutating /// such an object, no other thread may access it. Note that these free-standing /// objects are not associcated with an instance of Service, and are therefore /// not part of a service context. /// /// /// ### Comparison with ASIO /// /// There is a crucial difference between the two libraries in regards to the /// guarantees that are provided about the cancelability of asynchronous /// operations. The Realm networking library (this library) considers an /// asynchronous operation to be complete precisely when the completion handler /// starts to execute, and it guarantees that such an operation is cancelable up /// until that point in time. In particular, if `cancel()` is called on a socket /// or a deadline timer object before the completion handler starts to execute, /// then that operation will be canceled, and will receive /// `error::operation_aborted`. This guarantee is possible to provide (and free /// of ambiguities) precisely because this library prohibits multiple threads /// from executing the event loop concurrently, and because `cancel()` is /// allowed to be called only from a completion handler (executed by the event /// loop thread) or while no thread is executing the event loop. This guarantee /// allows for safe destruction of sockets and deadline timers as long as the /// completion handlers react appropriately to `error::operation_aborted`, in /// particular, that they do not attempt to access the socket or deadline timer /// object in such cases. /// /// ASIO, on the other hand, allows for an asynchronous operation to complete /// and become **uncancellable** before the completion handler starts to /// execute. For this reason, it is possible with ASIO to get the completion /// handler of an asynchronous wait operation to start executing and receive an /// error code other than "operation aborted" at a point in time where /// `cancel()` has already been called on the deadline timer object, or even at /// a point in timer where the deadline timer has been destroyed. This seems /// like an inevitable consequence of the fact that ASIO allows for multiple /// threads to execute the event loop concurrently. This generally forces ASIO /// applications to invent ways of extending the lifetime of deadline timer and /// socket objects until the completion handler starts executing. /// /// IMPORTANT: Even if ASIO is used in a way where at most one thread executes /// the event loop, there is still no guarantee that an asynchronous operation /// remains cancelable up until the point in time where the completion handler /// starts to execute. std::string host_name(); class StreamProtocol; class Address; class Endpoint; class Service; class Resolver; class SocketBase; class Socket; class Acceptor; class DeadlineTimer; class ReadAheadBuffer; namespace ssl { class Stream; } // namespace ssl /// \brief An IP protocol descriptor. class StreamProtocol { public: static StreamProtocol ip_v4(); static StreamProtocol ip_v6(); bool is_ip_v4() const; bool is_ip_v6() const; int protocol() const; int family() const; StreamProtocol(); ~StreamProtocol() noexcept {} private: int m_family; int m_socktype; int m_protocol; friend class Service; friend class SocketBase; }; /// \brief An IP address (IPv4 or IPv6). class Address { public: bool is_ip_v4() const; bool is_ip_v6() const; template <class C, class T> friend std::basic_ostream<C, T>& operator<<(std::basic_ostream<C, T>&, const Address&); Address(); ~Address() noexcept {} private: using ip_v4_type = in_addr; using ip_v6_type = in6_addr; union union_type { ip_v4_type m_ip_v4; ip_v6_type m_ip_v6; }; union_type m_union; std::uint_least32_t m_ip_v6_scope_id = 0; bool m_is_ip_v6 = false; friend Address make_address(const char*, std::error_code&) noexcept; friend class Endpoint; }; Address make_address(const char* c_str); Address make_address(const char* c_str, std::error_code& ec) noexcept; Address make_address(const std::string&); Address make_address(const std::string&, std::error_code& ec) noexcept; /// \brief An IP endpoint. /// /// An IP endpoint is a triplet (`protocol`, `address`, `port`). class Endpoint { public: using port_type = std::uint_fast16_t; class List; StreamProtocol protocol() const; Address address() const; port_type port() const; Endpoint(); Endpoint(const StreamProtocol&, port_type); Endpoint(const Address&, port_type); ~Endpoint() noexcept {} using data_type = sockaddr; data_type* data(); const data_type* data() const; private: StreamProtocol m_protocol; using sockaddr_base_type = sockaddr; using sockaddr_ip_v4_type = sockaddr_in; using sockaddr_ip_v6_type = sockaddr_in6; union sockaddr_union_type { sockaddr_base_type m_base; sockaddr_ip_v4_type m_ip_v4; sockaddr_ip_v6_type m_ip_v6; }; sockaddr_union_type m_sockaddr_union; friend class Service; friend class Resolver; friend class SocketBase; friend class Socket; }; /// \brief A list of IP endpoints. class Endpoint::List { public: using iterator = const Endpoint*; iterator begin() const noexcept; iterator end() const noexcept; std::size_t size() const noexcept; bool empty() const noexcept; List() noexcept = default; List(List&&) noexcept = default; ~List() noexcept = default; List& operator=(List&&) noexcept = default; private: util::Buffer<Endpoint> m_endpoints; friend class Service; }; /// \brief TCP/IP networking service. class Service { public: Service(); ~Service() noexcept; /// \brief Execute the event loop. /// /// Execute completion handlers of completed asynchronous operations, or /// wait for more completion handlers to become ready for /// execution. Handlers submitted via post() are considered immeditely /// ready. If there are no completion handlers ready for execution, and /// there are no asynchronous operations in progress, run() returns. /// /// run_until_stopped() will continue running even if there are no completion /// handlers ready for execution, and no asynchronous operations in progress, /// until stop() is called. /// /// All completion handlers, including handlers submitted via post() will be /// executed from run(), that is, by the thread that executes run(). If no /// thread executes run(), then the completion handlers will not be /// executed. /// /// Exceptions thrown by completion handlers will always propagate back /// through run(). /// /// Syncronous operations (e.g., Socket::connect()) execute independently of /// the event loop, and do not require that any thread calls run(). void run(); void run_until_stopped(); /// @{ \brief Stop event loop execution. /// /// stop() puts the event loop into the stopped mode. If a thread is /// currently executing run(), it will be made to return in a timely /// fashion, that is, without further blocking. If a thread is currently /// blocked in run(), it will be unblocked. Handlers that can be executed /// immediately, may, or may not be executed before run() returns, but new /// handlers submitted by these, will not be executed before run() /// returns. Also, if a handler is submitted by a call to post, and that /// call happens after stop() returns, then that handler is guaranteed to /// not be executed before run() returns (assuming that reset() is not called /// before run() returns). /// /// The event loop will remain in the stopped mode until reset() is /// called. If reset() is called before run() returns, it may, or may not /// cause run() to resume normal operation without returning. /// /// Both stop() and reset() are thread-safe, that is, they may be called by /// any thread. Also, both of these function may be called from completion /// handlers (including posted handlers). void stop() noexcept; void reset() noexcept; /// @} /// \brief Submit a handler to be executed by the event loop thread. /// /// Register the sepcified completion handler for immediate asynchronous /// execution. The specified handler will be executed by an expression on /// the form `handler(status)` where status is a Status object whose value /// will always be OK, but may change in the future. If the handler object /// is movable, it will never be copied. Otherwise, it will be copied as /// necessary. /// /// This function is thread-safe, that is, it may be called by any /// thread. It may also be called from other completion handlers. /// /// The handler will never be called as part of the execution of post(). It /// will always be called by a thread that is executing run(). If no thread /// is currently executing run(), the handler will not be executed until a /// thread starts executing run(). If post() is called while another thread /// is executing run(), the handler may be called before post() returns. If /// post() is called from another completion handler, the submitted handler /// is guaranteed to not be called during the execution of post(). /// /// Completion handlers added through post() will be executed in the order /// that they are added. More precisely, if post() is called twice to add /// two handlers, A and B, and the execution of post(A) ends before the /// beginning of the execution of post(B), then A is guaranteed to execute /// before B. template <class H> void post(H handler); /// Argument `saturation` is the fraction of time that is not spent /// sleeping. Argument `inefficiency` is the fraction of time not spent /// sleeping, and not spent executing completion handlers. Both values are /// guaranteed to always be in the range 0 to 1 (both inclusive). The value /// passed as `inefficiency` is guaranteed to always be less than, or equal /// to the value passed as `saturation`. using EventLoopMetricsHandler = void(double saturation, double inefficiency); /// \brief Report event loop metrics via the specified handler. /// /// The handler will be called approximately every 30 seconds. /// /// report_event_loop_metrics() must be called prior to any invocation of /// run(). report_event_loop_metrics() is not thread-safe. /// /// This feature is only available if /// `REALM_UTIL_NETWORK_EVENT_LOOP_METRICS` was defined during /// compilation. When the feature is not available, the specified handler /// will never be called. void report_event_loop_metrics(util::UniqueFunction<EventLoopMetricsHandler>); private: enum class Want { nothing = 0, read, write }; template <class Oper> class OperQueue; class Descriptor; class AsyncOper; class ResolveOperBase; class WaitOperBase; class TriggerExecOperBase; class PostOperBase; template <class H> class PostOper; class IoOper; class UnusedOper; // Allocated, but currently unused memory template <class S> class BasicStreamOps; struct OwnersOperDeleter { void operator()(AsyncOper*) const noexcept; }; struct LendersOperDeleter { void operator()(AsyncOper*) const noexcept; }; using OwnersOperPtr = std::unique_ptr<AsyncOper, OwnersOperDeleter>; using LendersOperPtr = std::unique_ptr<AsyncOper, LendersOperDeleter>; using LendersResolveOperPtr = std::unique_ptr<ResolveOperBase, LendersOperDeleter>; using LendersWaitOperPtr = std::unique_ptr<WaitOperBase, LendersOperDeleter>; using LendersIoOperPtr = std::unique_ptr<IoOper, LendersOperDeleter>; class IoReactor; class Impl; const std::unique_ptr<Impl> m_impl; template <class Oper, class... Args> static std::unique_ptr<Oper, LendersOperDeleter> alloc(OwnersOperPtr&, Args&&...); using PostOperConstr = PostOperBase*(void* addr, std::size_t size, Impl&, void* cookie); void do_post(PostOperConstr, std::size_t size, void* cookie); template <class H> static PostOperBase* post_oper_constr(void* addr, std::size_t size, Impl&, void* cookie); static void recycle_post_oper(Impl&, PostOperBase*) noexcept; static void trigger_exec(Impl&, TriggerExecOperBase&) noexcept; static void reset_trigger_exec(Impl&, TriggerExecOperBase&) noexcept; using clock = std::chrono::steady_clock; friend class Resolver; friend class SocketBase; friend class Socket; friend class Acceptor; friend class DeadlineTimer; friend class ReadAheadBuffer; friend class ssl::Stream; }; template <class Oper> class Service::OperQueue { public: using LendersOperPtr = std::unique_ptr<Oper, LendersOperDeleter>; bool empty() const noexcept; void push_back(LendersOperPtr) noexcept; template <class Oper2> void push_back(OperQueue<Oper2>&) noexcept; LendersOperPtr pop_front() noexcept; void clear() noexcept; OperQueue() noexcept = default; OperQueue(OperQueue&&) noexcept; ~OperQueue() noexcept { clear(); } private: Oper* m_back = nullptr; template <class> friend class OperQueue; }; class Service::Descriptor { public: #ifdef _WIN32 using native_handle_type = SOCKET; #else using native_handle_type = int; #endif Impl& service_impl; Descriptor(Impl& service) noexcept; ~Descriptor() noexcept; /// \param in_blocking_mode Must be true if, and only if the passed file /// descriptor refers to a file description in which the file status flag /// O_NONBLOCK is not set. /// /// The passed file descriptor must have the file descriptor flag FD_CLOEXEC /// set. void assign(native_handle_type fd, bool in_blocking_mode) noexcept; void close() noexcept; native_handle_type release() noexcept; bool is_open() const noexcept; native_handle_type native_handle() const noexcept; bool in_blocking_mode() const noexcept; void accept(Descriptor&, StreamProtocol, Endpoint*, std::error_code&) noexcept; std::size_t read_some(char* buffer, std::size_t size, std::error_code&) noexcept; std::size_t write_some(const char* data, std::size_t size, std::error_code&) noexcept; /// \tparam Oper An operation type inherited from IoOper with an initate() /// function that initiates the operation and figures out whether it needs /// to read from, or write to the underlying descriptor to /// proceed. `initiate()` must return Want::read if the operation needs to /// read, or Want::write if the operation needs to write. If the operation /// completes immediately (e.g. due to a failure during initialization), /// `initiate()` must return Want::nothing. template <class Oper, class... Args> void initiate_oper(std::unique_ptr<Oper, LendersOperDeleter>, Args&&...); void ensure_blocking_mode(); void ensure_nonblocking_mode(); private: native_handle_type m_fd = -1; bool m_in_blocking_mode; // Not in nonblocking mode #if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE bool m_read_ready; bool m_write_ready; bool m_imminent_end_of_input; // Kernel has seen the end of input bool m_is_registered; OperQueue<IoOper> m_suspended_read_ops, m_suspended_write_ops; void deregister_for_async() noexcept; #endif bool assume_read_would_block() const noexcept; bool assume_write_would_block() const noexcept; void set_read_ready(bool) noexcept; void set_write_ready(bool) noexcept; void set_nonblock_flag(bool value); void add_initiated_oper(LendersIoOperPtr, Want); void do_close() noexcept; native_handle_type do_release() noexcept; friend class IoReactor; }; class Resolver { public: class Query; Resolver(Service&); ~Resolver() noexcept; /// Thread-safe. Service& get_service() noexcept; /// @{ \brief Resolve the specified query to one or more endpoints. Endpoint::List resolve(const Query&); Endpoint::List resolve(const Query&, std::error_code&); /// @} /// \brief Perform an asynchronous resolve operation. /// /// Initiate an asynchronous resolve operation. The completion handler will /// be called when the operation completes. The operation completes when it /// succeeds, or an error occurs. /// /// The completion handler is always executed by the event loop thread, /// i.e., by a thread that is executing Service::run(). Conversely, the /// completion handler is guaranteed to not be called while no thread is /// executing Service::run(). The execution of the completion handler is /// always deferred to the event loop, meaning that it never happens as a /// synchronous side effect of the execution of async_resolve(), even when /// async_resolve() is executed by the event loop thread. The completion /// handler is guaranteed to be called eventually, as long as there is time /// enough for the operation to complete or fail, and a thread is executing /// Service::run() for long enough. /// /// The operation can be canceled by calling cancel(), and will be /// automatically canceled if the resolver object is destroyed. If the /// operation is canceled, it will fail with `error::operation_aborted`. The /// operation remains cancelable up until the point in time where the /// completion handler starts to execute. This means that if cancel() is /// called before the completion handler starts to execute, then the /// completion handler is guaranteed to have `error::operation_aborted` /// passed to it. This is true regardless of whether cancel() is called /// explicitly or implicitly, such as when the resolver is destroyed. /// /// The specified handler will be executed by an expression on the form /// `handler(ec, endpoints)` where `ec` is the error code and `endpoints` is /// an object of type `Endpoint::List`. If the the handler object is /// movable, it will never be copied. Otherwise, it will be copied as /// necessary. /// /// It is an error to start a new resolve operation (synchronous or /// asynchronous) while an asynchronous resolve operation is in progress via /// the same resolver object. An asynchronous resolve operation is /// considered complete as soon as the completion handler starts to /// execute. This means that a new resolve operation can be started from the /// completion handler. template <class H> void async_resolve(Query, H&& handler); /// \brief Cancel all asynchronous operations. /// /// Cause all incomplete asynchronous operations, that are associated with /// this resolver (at most one), to fail with `error::operation_aborted`. An /// asynchronous operation is complete precisely when its completion handler /// starts executing. /// /// Completion handlers of canceled operations will become immediately ready /// to execute, but will never be executed directly as part of the execution /// of cancel(). /// /// Cancellation happens automatically when the resolver object is destroyed. void cancel() noexcept; private: template <class H> class ResolveOper; Service::Impl& m_service_impl; Service::OwnersOperPtr m_resolve_oper; void initiate_oper(Service::LendersResolveOperPtr); }; class Resolver::Query { public: enum { /// Locally bound socket endpoint (server side) passive = AI_PASSIVE, /// Ignore families without a configured non-loopback address address_configured = AI_ADDRCONFIG }; Query(std::string service_port, int init_flags = passive | address_configured); Query(const StreamProtocol&, std::string service_port, int init_flags = passive | address_configured); Query(std::string host_name, std::string service_port, int init_flags = address_configured); Query(const StreamProtocol&, std::string host_name, std::string service_port, int init_flags = address_configured); ~Query() noexcept; int flags() const; StreamProtocol protocol() const; std::string host() const; std::string service() const; private: int m_flags; StreamProtocol m_protocol; std::string m_host; // hostname std::string m_service; // port friend class Service; }; class SocketBase { public: using native_handle_type = Service::Descriptor::native_handle_type; ~SocketBase() noexcept; /// Thread-safe. Service& get_service() noexcept; bool is_open() const noexcept; native_handle_type native_handle() const noexcept; /// @{ \brief Open the socket for use with the specified protocol. /// /// It is an error to call open() on a socket that is already open. void open(const StreamProtocol&); std::error_code open(const StreamProtocol&, std::error_code&); /// @} /// \brief Close this socket. /// /// If the socket is open, it will be closed. If it is already closed (or /// never opened), this function does nothing (idempotency). /// /// A socket is automatically closed when destroyed. /// /// When the socket is closed, any incomplete asynchronous operation will be /// canceled (as if cancel() was called). void close() noexcept; /// \brief Cancel all asynchronous operations. /// /// Cause all incomplete asynchronous operations, that are associated with /// this socket, to fail with `error::operation_aborted`. An asynchronous /// operation is complete precisely when its completion handler starts /// executing. /// /// Completion handlers of canceled operations will become immediately ready /// to execute, but will never be executed directly as part of the execution /// of cancel(). void cancel() noexcept; template <class O> void get_option(O& opt) const; template <class O> std::error_code get_option(O& opt, std::error_code&) const; template <class O> void set_option(const O& opt); template <class O> std::error_code set_option(const O& opt, std::error_code&); void bind(const Endpoint&); std::error_code bind(const Endpoint&, std::error_code&); Endpoint local_endpoint() const; Endpoint local_endpoint(std::error_code&) const; /// Release the ownership of this socket object over the native handle and /// return the native handle to the caller. The caller assumes ownership /// over the returned handle. The socket is left in a closed /// state. Incomplete asynchronous operations will be canceled as if close() /// had been called. /// /// If called on a closed socket, this function is a no-op, and returns the /// same value as would be returned by native_handle() native_handle_type release_native_handle() noexcept; private: enum opt_enum { opt_ReuseAddr, ///< `SOL_SOCKET`, `SO_REUSEADDR` opt_Linger, ///< `SOL_SOCKET`, `SO_LINGER` opt_NoDelay, ///< `IPPROTO_TCP`, `TCP_NODELAY` (disable the Nagle algorithm) }; template <class, int, class> class Option; public: using reuse_address = Option<bool, opt_ReuseAddr, int>; using no_delay = Option<bool, opt_NoDelay, int>; // linger struct defined by POSIX sys/socket.h. struct linger_opt; using linger = Option<linger_opt, opt_Linger, struct linger>; protected: Service::Descriptor m_desc; private: StreamProtocol m_protocol; protected: Service::OwnersOperPtr m_read_oper; // Read or accept Service::OwnersOperPtr m_write_oper; // Write or connect SocketBase(Service&); const StreamProtocol& get_protocol() const noexcept; std::error_code do_assign(const StreamProtocol&, native_handle_type, std::error_code&); void do_close() noexcept; void get_option(opt_enum, void* value_data, std::size_t& value_size, std::error_code&) const; void set_option(opt_enum, const void* value_data, std::size_t value_size, std::error_code&); void map_option(opt_enum, int& level, int& option_name) const; friend class Acceptor; }; template <class T, int opt, class U> class SocketBase::Option { public: Option(T value = T()); T value() const; private: T m_value; void get(const SocketBase&, std::error_code&); void set(SocketBase&, std::error_code&) const; friend class SocketBase; }; struct SocketBase::linger_opt { linger_opt(bool enable, int timeout_seconds = 0) { m_linger.l_onoff = enable ? 1 : 0; m_linger.l_linger = timeout_seconds; } ::linger m_linger; operator ::linger() const { return m_linger; } bool enabled() const { return m_linger.l_onoff != 0; } int timeout() const { return m_linger.l_linger; } }; /// Switching between synchronous and asynchronous operations is allowed, but /// only in a nonoverlapping fashion. That is, a synchronous operation is not /// allowed to run concurrently with an asynchronous one on the same /// socket. Note that an asynchronous operation is considered to be running /// until its completion handler starts executing. class Socket : public SocketBase { public: Socket(Service&); /// \brief Create a socket with an already-connected native socket handle. /// /// This constructor is shorthand for creating the socket with the /// one-argument constructor, and then calling the two-argument assign() /// with the specified protocol and native handle. Socket(Service&, const StreamProtocol&, native_handle_type); ~Socket() noexcept; void connect(const Endpoint&); std::error_code connect(const Endpoint&, std::error_code&); /// @{ \brief Perform a synchronous read operation. /// /// read() will not return until the specified buffer is full, or an error /// occurs. Reaching the end of input before the buffer is filled, is /// considered an error, and will cause the operation to fail with /// MiscExtErrors::end_of_input. /// /// read_until() will not return until the specified buffer contains the /// specified delimiter, or an error occurs. If the buffer is filled before /// the delimiter is found, the operation fails with /// MiscExtErrors::delim_not_found. Otherwise, if the end of input is /// reached before the delimiter is found, the operation fails with /// MiscExtErrors::end_of_input. If the operation succeeds, the last byte /// placed in the buffer is the delimiter. /// /// The versions that take a ReadAheadBuffer argument will read through that /// buffer. This allows for fewer larger reads on the underlying /// socket. Since unconsumed data may be left in the read-ahead buffer after /// a read operation returns, it is important that the same read-ahead /// buffer is passed to the next read operation. /// /// The versions of read() and read_until() that do not take an /// `std::error_code&` argument will throw std::system_error on failure. /// /// The versions that do take an `std::error_code&` argument will set \a ec /// to `std::error_code()` on success, and to something else on failure. On /// failure they will return the number of bytes placed in the specified /// buffer before the error occured. /// /// \return The number of bytes places in the specified buffer upon return. std::size_t read(char* buffer, std::size_t size); std::size_t read(char* buffer, std::size_t size, std::error_code& ec); std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&); std::size_t read(char* buffer, std::size_t size, ReadAheadBuffer&, std::error_code& ec); std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&); std::size_t read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&, std::error_code& ec); /// @} /// @{ \brief Perform a synchronous write operation. /// /// write() will not return until all the specified bytes have been written /// to the socket, or an error occurs. /// /// The versions of write() that does not take an `std::error_code&` /// argument will throw std::system_error on failure. When it succeeds, it /// always returns \a size. /// /// The versions that does take an `std::error_code&` argument will set \a /// ec to `std::error_code()` on success, and to something else on /// failure. On success it returns \a size. On faulure it returns the number /// of bytes written before the failure occured. std::size_t write(const char* data, std::size_t size); std::size_t write(const char* data, std::size_t size, std::error_code& ec); /// @} /// @{ \brief Read at least one byte from this socket. /// /// If \a size is zero, both versions of read_some() will return zero /// without blocking. Read errors may or may not be detected in this case. /// /// Otherwise, if \a size is greater than zero, and at least one byte is /// immediately available, that is, without blocking, then both versions /// will read at least one byte (but generally as many immediately available /// bytes as will fit into the specified buffer), and return without /// blocking. /// /// Otherwise, both versions will block the calling thread until at least one /// byte becomes available, or an error occurs. /// /// In this context, it counts as an error, if the end of input is reached /// before at least one byte becomes available (see /// MiscExtErrors::end_of_input). /// /// If no error occurs, both versions will return the number of bytes placed /// in the specified buffer, which is generally as many as are immediately /// available at the time when the first byte becomes available, although /// never more than \a size. /// /// If no error occurs, the three-argument version will set \a ec to /// indicate success. /// /// If an error occurs, the two-argument version will throw /// `std::system_error`, while the three-argument version will set \a ec to /// indicate the error, and return zero. /// /// As long as \a size is greater than zero, the two argument version will /// always return a value that is greater than zero, while the three /// argument version will return a value greater than zero when, and only /// when \a ec is set to indicate success (no error, and no end of input). std::size_t read_some(char* buffer, std::size_t size); std::size_t read_some(char* buffer, std::size_t size, std::error_code& ec); /// @} /// @{ \brief Write at least one byte to this socket. /// /// If \a size is zero, both versions of write_some() will return zero /// without blocking. Write errors may or may not be detected in this case. /// /// Otherwise, if \a size is greater than zero, and at least one byte can be /// written immediately, that is, without blocking, then both versions will /// write at least one byte (but generally as many as can be written /// immediately), and return without blocking. /// /// Otherwise, both versions will block the calling thread until at least one /// byte can be written, or an error occurs. /// /// If no error occurs, both versions will return the number of bytes /// written, which is generally as many as can be written immediately at the /// time when the first byte can be written. /// /// If no error occurs, the three-argument version will set \a ec to /// indicate success. /// /// If an error occurs, the two-argument version will throw /// `std::system_error`, while the three-argument version will set \a ec to /// indicate the error, and return zero. /// /// As long as \a size is greater than zero, the two argument version will /// always return a value that is greater than zero, while the three /// argument version will return a value greater than zero when, and only /// when \a ec is set to indicate success. std::size_t write_some(const char* data, std::size_t size); std::size_t write_some(const char* data, std::size_t size, std::error_code&); /// @} /// \brief Perform an asynchronous connect operation. /// /// Initiate an asynchronous connect operation. The completion handler is /// called when the operation completes. The operation completes when the /// connection is established, or an error occurs. /// /// The completion handler is always executed by the event loop thread, /// i.e., by a thread that is executing Service::run(). Conversely, the /// completion handler is guaranteed to not be called while no thread is /// executing Service::run(). The execution of the completion handler is /// always deferred to the event loop, meaning that it never happens as a /// synchronous side effect of the execution of async_connect(), even when /// async_connect() is executed by the event loop thread. The completion /// handler is guaranteed to be called eventually, as long as there is time /// enough for the operation to complete or fail, and a thread is executing /// Service::run() for long enough. /// /// The operation can be canceled by calling cancel(), and will be /// automatically canceled if the socket is closed. If the operation is /// canceled, it will fail with `error::operation_aborted`. The operation /// remains cancelable up until the point in time where the completion /// handler starts to execute. This means that if cancel() is called before /// the completion handler starts to execute, then the completion handler is /// guaranteed to have `error::operation_aborted` passed to it. This is true /// regardless of whether cancel() is called explicitly or implicitly, such /// as when the socket is destroyed. /// /// If the socket is not already open, it will be opened as part of the /// connect operation as if by calling `open(ep.protocol())`. If the opening /// operation succeeds, but the connect operation fails, the socket will be /// left in the opened state. /// /// The specified handler will be executed by an expression on the form /// `handler(ec)` where `ec` is the error code. If the the handler object is /// movable, it will never be copied. Otherwise, it will be copied as /// necessary. /// /// It is an error to start a new connect operation (synchronous or /// asynchronous) while an asynchronous connect operation is in progress. An /// asynchronous connect operation is considered complete as soon as the /// completion handler starts to execute. /// /// \param ep The remote endpoint of the connection to be established. template <class H> void async_connect(const Endpoint& ep, H&& handler); /// @{ \brief Perform an asynchronous read operation. /// /// Initiate an asynchronous buffered read operation on the associated /// socket. The completion handler will be called when the operation /// completes, or an error occurs. /// /// async_read() will continue reading until the specified buffer is full, /// or an error occurs. If the end of input is reached before the buffer is /// filled, the operation fails with MiscExtErrors::end_of_input. /// /// async_read_until() will continue reading until the specified buffer /// contains the specified delimiter, or an error occurs. If the buffer is /// filled before a delimiter is found, the operation fails with /// MiscExtErrors::delim_not_found. Otherwise, if the end of input is /// reached before a delimiter is found, the operation fails with /// MiscExtErrors::end_of_input. Otherwise, if the operation succeeds, the /// last byte placed in the buffer is the delimiter. /// /// The versions that take a ReadAheadBuffer argument will read through that /// buffer. This allows for fewer larger reads on the underlying /// socket. Since unconsumed data may be left in the read-ahead buffer after /// a read operation completes, it is important that the same read-ahead /// buffer is passed to the next read operation. /// /// The completion handler is always executed by the event loop thread, /// i.e., by a thread that is executing Service::run(). Conversely, the /// completion handler is guaranteed to not be called while no thread is /// executing Service::run(). The execution of the completion handler is /// always deferred to the event loop, meaning that it never happens as a /// synchronous side effect of the execution of async_read() or /// async_read_until(), even when async_read() or async_read_until() is /// executed by the event loop thread. The completion handler is guaranteed /// to be called eventually, as long as there is time enough for the /// operation to complete or fail, and a thread is executing Service::run() /// for long enough. /// /// The operation can be canceled by calling cancel() on the associated /// socket, and will be automatically canceled if the associated socket is /// closed. If the operation is canceled, it will fail with /// `error::operation_aborted`. The operation remains cancelable up until /// the point in time where the completion handler starts to execute. This /// means that if cancel() is called before the completion handler starts to /// execute, then the completion handler is guaranteed to have /// `error::operation_aborted` passed to it. This is true regardless of /// whether cancel() is called explicitly or implicitly, such as when the /// socket is destroyed. /// /// The specified handler will be executed by an expression on the form /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of /// bytes placed in the buffer (of type `std::size_t`). `n` is guaranteed to /// be less than, or equal to \a size. If the the handler object is movable, /// it will never be copied. Otherwise, it will be copied as necessary. /// /// It is an error to start a read operation before the associated socket is /// connected. /// /// It is an error to start a new read operation (synchronous or /// asynchronous) while an asynchronous read operation is in progress. An /// asynchronous read operation is considered complete as soon as the /// completion handler starts executing. This means that a new read /// operation can be started from the completion handler of another /// asynchronous buffered read operation. template <class H> void async_read(char* buffer, std::size_t size, H&& handler); template <class H> void async_read(char* buffer, std::size_t size, ReadAheadBuffer&, H&& handler); template <class H> void async_read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer&, H&& handler); /// @} /// \brief Perform an asynchronous write operation. /// /// Initiate an asynchronous write operation. The completion handler is /// called when the operation completes. The operation completes when all /// the specified bytes have been written to the socket, or an error occurs. /// /// The completion handler is always executed by the event loop thread, /// i.e., by a thread that is executing Service::run(). Conversely, the /// completion handler is guaranteed to not be called while no thread is /// executing Service::run(). The execution of the completion handler is /// always deferred to the event loop, meaning that it never happens as a /// synchronous side effect of the execution of async_write(), even when /// async_write() is executed by the event loop thread. The completion /// handler is guaranteed to be called eventually, as long as there is time /// enough for the operation to complete or fail, and a thread is executing /// Service::run() for long enough. /// /// The operation can be canceled by calling cancel(), and will be /// automatically canceled if the socket is closed. If the operation is /// canceled, it will fail with `error::operation_aborted`. The operation /// remains cancelable up until the point in time where the completion /// handler starts to execute. This means that if cancel() is called before /// the completion handler starts to execute, then the completion handler is /// guaranteed to have `error::operation_aborted` passed to it. This is true /// regardless of whether cancel() is called explicitly or implicitly, such /// as when the socket is destroyed. /// /// The specified handler will be executed by an expression on the form /// `handler(ec, n)` where `ec` is the error code, and `n` is the number of /// bytes written (of type `std::size_t`). If the the handler object is /// movable, it will never be copied. Otherwise, it will be copied as /// necessary. /// /// It is an error to start an asynchronous write operation before the /// socket is connected. /// /// It is an error to start a new write operation (synchronous or /// asynchronous) while an asynchronous write operation is in progress. An /// asynchronous write operation is considered complete as soon as the /// completion handler starts to execute. This means that a new write /// operation can be started from the completion handler of another /// asynchronous write operation. template <class H> void async_write(const char* data, std::size_t size, H&& handler); template <class H> void async_read_some(char* buffer, std::size_t size, H&& handler); template <class H> void async_write_some(const char* data, std::size_t size, H&& handler); enum shutdown_type { #ifdef _WIN32 /// Shutdown the receiving side of the socket. shutdown_receive = SD_RECEIVE, /// Shutdown the sending side of the socket. shutdown_send = SD_SEND, /// Shutdown both sending and receiving side of the socket. shutdown_both = SD_BOTH #else shutdown_receive = SHUT_RD, shutdown_send = SHUT_WR, shutdown_both = SHUT_RDWR #endif }; /// @{ \brief Shut down the connected sockets sending and/or receiving /// side. /// /// It is an error to call this function when the socket is not both open /// and connected. void shutdown(shutdown_type); std::error_code shutdown(shutdown_type, std::error_code&); /// @} /// @{ \brief Initialize socket with an already-connected native socket /// handle. /// /// The specified native handle must refer to a socket that is already fully /// open and connected. /// /// If the assignment operation succeeds, this socket object has taken /// ownership of the specified native handle, and the handle will be closed /// when the socket object is destroyed, (or when close() is called). If the /// operation fails, the caller still owns the specified native handle. /// /// It is an error to call connect() or async_connect() on a socket object /// that is initialized this way (unless it is first closed). /// /// It is an error to call this function on a socket object that is already /// open. void assign(const StreamProtocol&, native_handle_type); std::error_code assign(const StreamProtocol&, native_handle_type, std::error_code&); /// @} /// Returns a reference to this socket, as this socket is the lowest layer /// of a stream. Socket& lowest_layer() noexcept; private: using Want = Service::Want; using StreamOps = Service::BasicStreamOps<Socket>; class ConnectOperBase; template <class H> class ConnectOper; using LendersConnectOperPtr = std::unique_ptr<ConnectOperBase, Service::LendersOperDeleter>; // `ec` untouched on success, but no immediate completion bool initiate_async_connect(const Endpoint&, std::error_code& ec); // `ec` untouched on success std::error_code finalize_async_connect(std::error_code& ec) noexcept; // See Service::BasicStreamOps for details on these these 6 functions. void do_init_read_async(std::error_code&, Want&) noexcept; void do_init_write_async(std::error_code&, Want&) noexcept; std::size_t do_read_some_sync(char* buffer, std::size_t size, std::error_code&) noexcept; std::size_t do_write_some_sync(const char* data, std::size_t size, std::error_code&) noexcept; std::size_t do_read_some_async(char* buffer, std::size_t size, std::error_code&, Want&) noexcept; std::size_t do_write_some_async(const char* data, std::size_t size, std::error_code&, Want&) noexcept; friend class Service::BasicStreamOps<Socket>; friend class Service::BasicStreamOps<ssl::Stream>; friend class ReadAheadBuffer; friend class ssl::Stream; }; /// Switching between synchronous and asynchronous operations is allowed, but /// only in a nonoverlapping fashion. That is, a synchronous operation is not /// allowed to run concurrently with an asynchronous one on the same /// acceptor. Note that an asynchronous operation is considered to be running /// until its completion handler starts executing. class Acceptor : public SocketBase { public: Acceptor(Service&); ~Acceptor() noexcept; static constexpr int max_connections = SOMAXCONN; void listen(int backlog = max_connections); std::error_code listen(int backlog, std::error_code&); void accept(Socket&); void accept(Socket&, Endpoint&); std::error_code accept(Socket&, std::error_code&); std::error_code accept(Socket&, Endpoint&, std::error_code&); /// @{ \brief Perform an asynchronous accept operation. /// /// Initiate an asynchronous accept operation. The completion handler will /// be called when the operation completes. The operation completes when the /// connection is accepted, or an error occurs. If the operation succeeds, /// the specified local socket will have become connected to a remote /// socket. /// /// The completion handler is always executed by the event loop thread, /// i.e., by a thread that is executing Service::run(). Conversely, the /// completion handler is guaranteed to not be called while no thread is /// executing Service::run(). The execution of the completion handler is /// always deferred to the event loop, meaning that it never happens as a /// synchronous side effect of the execution of async_accept(), even when /// async_accept() is executed by the event loop thread. The completion /// handler is guaranteed to be called eventually, as long as there is time /// enough for the operation to complete or fail, and a thread is executing /// Service::run() for long enough. /// /// The operation can be canceled by calling cancel(), and will be /// automatically canceled if the acceptor is closed. If the operation is /// canceled, it will fail with `error::operation_aborted`. The operation /// remains cancelable up until the point in time where the completion /// handler starts to execute. This means that if cancel() is called before /// the completion handler starts to execute, then the completion handler is /// guaranteed to have `error::operation_aborted` passed to it. This is true /// regardless of whether cancel() is called explicitly or implicitly, such /// as when the acceptor is destroyed. /// /// The specified handler will be executed by an expression on the form /// `handler(ec)` where `ec` is the error code. If the the handler object is /// movable, it will never be copied. Otherwise, it will be copied as /// necessary. /// /// It is an error to start a new accept operation (synchronous or /// asynchronous) while an asynchronous accept operation is in progress. An /// asynchronous accept operation is considered complete as soon as the /// completion handler starts executing. This means that a new accept /// operation can be started from the completion handler. /// /// \param sock This is the local socket, that upon successful completion /// will have become connected to the remote socket. It must be in the /// closed state (Socket::is_open()) when async_accept() is called. /// template <class H> void async_accept(Socket& sock, H&& handler); /// \param ep Upon completion, the remote peer endpoint will have been /// assigned to this variable. template <class H> void async_accept(Socket& sock, Endpoint& ep, H&& handler); /// @} private: using Want = Service::Want; class AcceptOperBase; template <class H> class AcceptOper; using LendersAcceptOperPtr = std::unique_ptr<AcceptOperBase, Service::LendersOperDeleter>; std::error_code accept(Socket&, Endpoint*, std::error_code&); Want do_accept_async(Socket&, Endpoint*, std::error_code&) noexcept; template <class H> void async_accept(Socket&, Endpoint*, H&&); }; /// \brief A timer object supporting asynchronous wait operations. class DeadlineTimer { public: DeadlineTimer(Service&); ~DeadlineTimer() noexcept; /// Thread-safe. Service& get_service() noexcept; /// \brief Perform an asynchronous wait operation. /// /// Initiate an asynchronous wait operation. The completion handler becomes /// ready to execute when the expiration time is reached, or an error occurs /// (cancellation counts as an error here). The expiration time is the time /// of initiation plus the specified delay. The error code passed to the /// completion handler will **never** indicate success, unless the /// expiration time was reached. /// /// The completion handler is always executed by the event loop thread, /// i.e., by a thread that is executing Service::run(). Conversely, the /// completion handler is guaranteed to not be called while no thread is /// executing Service::run(). The execution of the completion handler is /// always deferred to the event loop, meaning that it never happens as a /// synchronous side effect of the execution of async_wait(), even when /// async_wait() is executed by the event loop thread. The completion /// handler is guaranteed to be called eventually, as long as there is time /// enough for the operation to complete or fail, and a thread is executing /// Service::run() for long enough. /// /// The operation can be canceled by calling cancel(), and will be /// automatically canceled if the timer is destroyed. If the operation is /// canceled, it will fail with `ErrorCodes::OperationAborted`. The operation /// remains cancelable up until the point in time where the completion /// handler starts to execute. This means that if cancel() is called before /// the completion handler starts to execute, then the completion handler is /// guaranteed to have `ErrorCodes::OperationAborted` passed to it. This is true /// regardless of whether cancel() is called explicitly or implicitly, such /// as when the timer is destroyed. /// /// The specified handler will be executed by an expression on the form /// `handler(status)` where `status` is a Status object. If the handler object /// is movable, it will never be copied. Otherwise, it will be copied as /// necessary. /// /// It is an error to start a new asynchronous wait operation while an /// another one is in progress. An asynchronous wait operation is in /// progress until its completion handler starts executing. template <class R, class P, class H> void async_wait(std::chrono::duration<R, P> delay, H&& handler); /// \brief Cancel an asynchronous wait operation. /// /// If an asynchronous wait operation, that is associated with this deadline /// timer, is in progress, cause it to fail with /// `error::operation_aborted`. An asynchronous wait operation is in /// progress until its completion handler starts executing. /// /// Completion handlers of canceled operations will become immediately ready /// to execute, but will never be executed directly as part of the execution /// of cancel(). /// /// Cancellation happens automatically when the timer object is destroyed. void cancel() noexcept; private: template <class H> class WaitOper; using clock = Service::clock; Service::Impl& m_service_impl; Service::OwnersOperPtr m_wait_oper; void initiate_oper(Service::LendersWaitOperPtr); }; class ReadAheadBuffer { public: ReadAheadBuffer(); /// Discard any buffered data. void clear() noexcept; private: using Want = Service::Want; char* m_begin = nullptr; char* m_end = nullptr; static constexpr std::size_t s_size = 1024; const std::unique_ptr<char[]> m_buffer; bool empty() const noexcept; bool read(char*& begin, char* end, int delim, std::error_code&) noexcept; template <class S> void refill_sync(S& stream, std::error_code&) noexcept; template <class S> bool refill_async(S& stream, std::error_code&, Want&) noexcept; template <class> friend class Service::BasicStreamOps; }; enum class ResolveErrors { /// Host not found (authoritative). host_not_found = 1, /// Host not found (non-authoritative). host_not_found_try_again = 2, /// The query is valid but does not have associated address data. no_data = 3, /// A non-recoverable error occurred. no_recovery = 4, /// The service is not supported for the given socket type. service_not_found = 5, /// The socket type is not supported. socket_type_not_supported = 6, }; /// The error category associated with ResolveErrors. The name of this category is /// `realm.sync.network.resolve`. const std::error_category& resolve_error_category() noexcept; std::error_code make_error_code(ResolveErrors err); } // namespace realm::sync::network namespace std { template <> class is_error_code_enum<realm::sync::network::ResolveErrors> { public: static const bool value = true; }; } // namespace std namespace realm::sync::network { // Implementation // ---------------- StreamProtocol ---------------- inline StreamProtocol StreamProtocol::ip_v4() { StreamProtocol prot; prot.m_family = AF_INET; return prot; } inline StreamProtocol StreamProtocol::ip_v6() { StreamProtocol prot; prot.m_family = AF_INET6; return prot; } inline bool StreamProtocol::is_ip_v4() const { return m_family == AF_INET; } inline bool StreamProtocol::is_ip_v6() const { return m_family == AF_INET6; } inline int StreamProtocol::family() const { return m_family; } inline int StreamProtocol::protocol() const { return m_protocol; } inline StreamProtocol::StreamProtocol() : m_family{AF_UNSPEC} , // Allow both IPv4 and IPv6 m_socktype{SOCK_STREAM} , // Or SOCK_DGRAM for UDP m_protocol{0} // Any protocol { } // ---------------- Address ---------------- inline bool Address::is_ip_v4() const { return !m_is_ip_v6; } inline bool Address::is_ip_v6() const { return m_is_ip_v6; } template <class C, class T> inline std::basic_ostream<C, T>& operator<<(std::basic_ostream<C, T>& out, const Address& addr) { // FIXME: Not taking `addr.m_ip_v6_scope_id` into account. What does ASIO // do? union buffer_union { char ip_v4[INET_ADDRSTRLEN]; char ip_v6[INET6_ADDRSTRLEN]; }; char buffer[sizeof(buffer_union)]; int af = addr.m_is_ip_v6 ? AF_INET6 : AF_INET; #ifdef _WIN32 void* src = const_cast<void*>(reinterpret_cast<const void*>(&addr.m_union)); #else const void* src = &addr.m_union; #endif const char* ret = ::inet_ntop(af, src, buffer, sizeof buffer); if (ret == 0) { std::error_code ec = util::make_basic_system_error_code(errno); throw std::system_error(ec); } out << ret; return out; } inline Address::Address() { m_union.m_ip_v4 = ip_v4_type(); } inline Address make_address(const char* c_str) { std::error_code ec; Address addr = make_address(c_str, ec); if (ec) throw std::system_error(ec); return addr; } inline Address make_address(const std::string& str) { std::error_code ec; Address addr = make_address(str, ec); if (ec) throw std::system_error(ec); return addr; } inline Address make_address(const std::string& str, std::error_code& ec) noexcept { return make_address(str.c_str(), ec); } // ---------------- Endpoint ---------------- inline StreamProtocol Endpoint::protocol() const { return m_protocol; } inline Address Endpoint::address() const { Address addr; if (m_protocol.is_ip_v4()) { addr.m_union.m_ip_v4 = m_sockaddr_union.m_ip_v4.sin_addr; } else { addr.m_union.m_ip_v6 = m_sockaddr_union.m_ip_v6.sin6_addr; addr.m_ip_v6_scope_id = m_sockaddr_union.m_ip_v6.sin6_scope_id; addr.m_is_ip_v6 = true; } return addr; } inline Endpoint::port_type Endpoint::port() const { return ntohs(m_protocol.is_ip_v4() ? m_sockaddr_union.m_ip_v4.sin_port : m_sockaddr_union.m_ip_v6.sin6_port); } inline Endpoint::data_type* Endpoint::data() { return &m_sockaddr_union.m_base; } inline const Endpoint::data_type* Endpoint::data() const { return &m_sockaddr_union.m_base; } inline Endpoint::Endpoint() : Endpoint{StreamProtocol::ip_v4(), 0} { } inline Endpoint::Endpoint(const StreamProtocol& protocol, port_type port) : m_protocol{protocol} { int family = m_protocol.family(); if (family == AF_INET) { m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear m_sockaddr_union.m_ip_v4.sin_family = AF_INET; m_sockaddr_union.m_ip_v4.sin_port = htons(port); m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY; } else if (family == AF_INET6) { m_sockaddr_union.m_ip_v6 = sockaddr_ip_v6_type(); // Clear m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6; m_sockaddr_union.m_ip_v6.sin6_port = htons(port); } else { m_sockaddr_union.m_ip_v4 = sockaddr_ip_v4_type(); // Clear m_sockaddr_union.m_ip_v4.sin_family = AF_UNSPEC; m_sockaddr_union.m_ip_v4.sin_port = htons(port); m_sockaddr_union.m_ip_v4.sin_addr.s_addr = INADDR_ANY; } } inline Endpoint::Endpoint(const Address& addr, port_type port) { if (addr.m_is_ip_v6) { m_protocol = StreamProtocol::ip_v6(); m_sockaddr_union.m_ip_v6.sin6_family = AF_INET6; m_sockaddr_union.m_ip_v6.sin6_port = htons(port); m_sockaddr_union.m_ip_v6.sin6_flowinfo = 0; m_sockaddr_union.m_ip_v6.sin6_addr = addr.m_union.m_ip_v6; m_sockaddr_union.m_ip_v6.sin6_scope_id = addr.m_ip_v6_scope_id; } else { m_protocol = StreamProtocol::ip_v4(); m_sockaddr_union.m_ip_v4.sin_family = AF_INET; m_sockaddr_union.m_ip_v4.sin_port = htons(port); m_sockaddr_union.m_ip_v4.sin_addr = addr.m_union.m_ip_v4; } } inline Endpoint::List::iterator Endpoint::List::begin() const noexcept { return m_endpoints.data(); } inline Endpoint::List::iterator Endpoint::List::end() const noexcept { return m_endpoints.data() + m_endpoints.size(); } inline std::size_t Endpoint::List::size() const noexcept { return m_endpoints.size(); } inline bool Endpoint::List::empty() const noexcept { return m_endpoints.size() == 0; } // ---------------- Service::OperQueue ---------------- template <class Oper> inline bool Service::OperQueue<Oper>::empty() const noexcept { return !m_back; } template <class Oper> inline void Service::OperQueue<Oper>::push_back(LendersOperPtr op) noexcept { REALM_ASSERT(!op->m_next); if (m_back) { op->m_next = m_back->m_next; m_back->m_next = op.get(); } else { op->m_next = op.get(); } m_back = op.release(); } template <class Oper> template <class Oper2> inline void Service::OperQueue<Oper>::push_back(OperQueue<Oper2>& q) noexcept { if (!q.m_back) return; if (m_back) std::swap(m_back->m_next, q.m_back->m_next); m_back = q.m_back; q.m_back = nullptr; } template <class Oper> inline auto Service::OperQueue<Oper>::pop_front() noexcept -> LendersOperPtr { Oper* op = nullptr; if (m_back) { op = static_cast<Oper*>(m_back->m_next); if (op != m_back) { m_back->m_next = op->m_next; } else { m_back = nullptr; } op->m_next = nullptr; } return LendersOperPtr(op); } template <class Oper> inline void Service::OperQueue<Oper>::clear() noexcept { if (m_back) { LendersOperPtr op(m_back); while (op->m_next != m_back) op.reset(static_cast<Oper*>(op->m_next)); m_back = nullptr; } } template <class Oper> inline Service::OperQueue<Oper>::OperQueue(OperQueue&& q) noexcept : m_back{q.m_back} { q.m_back = nullptr; } // ---------------- Service::Descriptor ---------------- inline Service::Descriptor::Descriptor(Impl& s) noexcept : service_impl{s} { } inline Service::Descriptor::~Descriptor() noexcept { if (is_open()) close(); } inline void Service::Descriptor::assign(native_handle_type fd, bool in_blocking_mode) noexcept { REALM_ASSERT(!is_open()); m_fd = fd; m_in_blocking_mode = in_blocking_mode; #if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE m_read_ready = false; m_write_ready = false; m_imminent_end_of_input = false; m_is_registered = false; #endif } inline void Service::Descriptor::close() noexcept { REALM_ASSERT(is_open()); #if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE if (m_is_registered) deregister_for_async(); m_is_registered = false; #endif do_close(); } inline auto Service::Descriptor::release() noexcept -> native_handle_type { REALM_ASSERT(is_open()); #if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE if (m_is_registered) deregister_for_async(); m_is_registered = false; #endif return do_release(); } inline bool Service::Descriptor::is_open() const noexcept { return (m_fd != -1); } inline auto Service::Descriptor::native_handle() const noexcept -> native_handle_type { return m_fd; } inline bool Service::Descriptor::in_blocking_mode() const noexcept { return m_in_blocking_mode; } template <class Oper, class... Args> inline void Service::Descriptor::initiate_oper(std::unique_ptr<Oper, LendersOperDeleter> op, Args&&... args) { Service::Want want = op->initiate(std::forward<Args>(args)...); // Throws add_initiated_oper(std::move(op), want); // Throws } inline void Service::Descriptor::ensure_blocking_mode() { // Assuming that descriptors are either used mostly in blocking mode, or // mostly in nonblocking mode. if (REALM_UNLIKELY(!m_in_blocking_mode)) { bool value = false; set_nonblock_flag(value); // Throws m_in_blocking_mode = true; } } inline void Service::Descriptor::ensure_nonblocking_mode() { // Assuming that descriptors are either used mostly in blocking mode, or // mostly in nonblocking mode. if (REALM_UNLIKELY(m_in_blocking_mode)) { bool value = true; set_nonblock_flag(value); // Throws m_in_blocking_mode = false; } } inline bool Service::Descriptor::assume_read_would_block() const noexcept { #if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE return !m_in_blocking_mode && !m_read_ready; #else return false; #endif } inline bool Service::Descriptor::assume_write_would_block() const noexcept { #if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE return !m_in_blocking_mode && !m_write_ready; #else return false; #endif } inline void Service::Descriptor::set_read_ready(bool value) noexcept { #if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE m_read_ready = value; #else // No-op static_cast<void>(value); #endif } inline void Service::Descriptor::set_write_ready(bool value) noexcept { #if REALM_NETWORK_USE_EPOLL || REALM_HAVE_KQUEUE m_write_ready = value; #else // No-op static_cast<void>(value); #endif } // ---------------- Service ---------------- class Service::AsyncOper { public: bool in_use() const noexcept; bool is_complete() const noexcept; bool is_canceled() const noexcept; void cancel() noexcept; /// Every object of type \ref AsyncOper must be destroyed either by a call /// to this function or to recycle(). This function recycles the operation /// object (commits suicide), even if it throws. virtual void recycle_and_execute() = 0; /// Every object of type \ref AsyncOper must be destroyed either by a call /// to recycle_and_execute() or to this function. This function destroys the /// object (commits suicide). virtual void recycle() noexcept = 0; /// Must be called when the owner dies, and the object is in use (not an /// instance of UnusedOper). virtual void orphan() noexcept = 0; protected: AsyncOper(std::size_t size, bool in_use) noexcept; virtual ~AsyncOper() noexcept {} void set_is_complete(bool value) noexcept; template <class H, class... Args> void do_recycle_and_execute(bool orphaned, H& handler, Args&&...); void do_recycle(bool orphaned) noexcept; private: std::size_t m_size; // Allocated number of bytes bool m_in_use = false; // Set to true when the operation completes successfully or fails. If the // operation is canceled before this happens, it will never be set to // true. Always false when not in use bool m_complete = false; // Set to true when the operation is canceled. Always false when not in use. bool m_canceled = false; AsyncOper* m_next = nullptr; // Always null when not in use template <class H, class... Args> void do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler, Args...); friend class Service; }; class Service::ResolveOperBase : public AsyncOper { public: ResolveOperBase(std::size_t size, Resolver& resolver, Resolver::Query query) noexcept : AsyncOper{size, true} , m_resolver{&resolver} , m_query{std::move(query)} { } void complete() noexcept { set_is_complete(true); } void recycle() noexcept override final { bool orphaned = !m_resolver; REALM_ASSERT(orphaned); // Note: do_recycle() commits suicide. do_recycle(orphaned); } void orphan() noexcept override final { m_resolver = nullptr; } protected: Resolver* m_resolver; Resolver::Query m_query; Endpoint::List m_endpoints; std::error_code m_error_code; friend class Service; }; class Service::WaitOperBase : public AsyncOper { public: WaitOperBase(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time) noexcept : AsyncOper{size, true} , // Second argument is `in_use` m_timer{&timer} , m_expiration_time{expiration_time} { } void complete() noexcept { set_is_complete(true); } void recycle() noexcept override final { bool orphaned = !m_timer; REALM_ASSERT(orphaned); // Note: do_recycle() commits suicide. do_recycle(orphaned); } void orphan() noexcept override final { m_timer = nullptr; } protected: DeadlineTimer* m_timer; clock::time_point m_expiration_time; friend class Service; }; class Service::TriggerExecOperBase : public AsyncOper, public util::AtomicRefCountBase { public: TriggerExecOperBase(Impl& service) noexcept : AsyncOper{0, false} , // First arg is `size` (unused), second arg is `in_use` m_service{&service} { } void recycle() noexcept override final { REALM_ASSERT(in_use()); REALM_ASSERT(!m_service); // Note: Potential suicide when `self` goes out of scope util::bind_ptr<TriggerExecOperBase> self{this, util::bind_ptr_base::adopt_tag{}}; } void orphan() noexcept override final { REALM_ASSERT(m_service); m_service = nullptr; } void trigger() noexcept { REALM_ASSERT(m_service); Service::trigger_exec(*m_service, *this); } protected: Impl* m_service; }; class Service::PostOperBase : public AsyncOper { public: PostOperBase(std::size_t size, Impl& service) noexcept : AsyncOper{size, true} , // Second argument is `in_use` m_service{service} { } void recycle() noexcept override final { // Service::recycle_post_oper() destroys this operation object Service::recycle_post_oper(m_service, this); } void orphan() noexcept override final { REALM_ASSERT(false); // Never called } protected: Impl& m_service; }; template <class H> class Service::PostOper : public PostOperBase { public: PostOper(std::size_t size, Impl& service, H&& handler) : PostOperBase{size, service} , m_handler{std::move(handler)} { } void recycle_and_execute() override final { // Recycle the operation object before the handler is exceuted, such // that the memory is available for a new post operation that might be // initiated during the execution of the handler. bool was_recycled = false; try { H handler = std::move(m_handler); // Throws // Service::recycle_post_oper() destroys this operation object Service::recycle_post_oper(m_service, this); was_recycled = true; handler(Status::OK()); // Throws } catch (...) { if (!was_recycled) { // Service::recycle_post_oper() destroys this operation object Service::recycle_post_oper(m_service, this); } throw; } } private: H m_handler; }; class Service::IoOper : public AsyncOper { public: IoOper(std::size_t size) noexcept : AsyncOper{size, true} // Second argument is `in_use` { } virtual Descriptor& descriptor() noexcept = 0; /// Advance this operation and figure out out whether it needs to read from, /// or write to the underlying descriptor to advance further. This function /// must return Want::read if the operation needs to read, or Want::write if /// the operation needs to write to advance further. If the operation /// completes (due to success or failure), this function must return /// Want::nothing. virtual Want advance() noexcept = 0; }; class Service::UnusedOper : public AsyncOper { public: UnusedOper(std::size_t size) noexcept : AsyncOper{size, false} // Second argument is `in_use` { } void recycle_and_execute() override final { // Must never be called REALM_ASSERT(false); } void recycle() noexcept override final { // Must never be called REALM_ASSERT(false); } void orphan() noexcept override final { // Must never be called REALM_ASSERT(false); } }; // `S` must be a stream class with the following member functions: // // Socket& lowest_layer() noexcept; // // void do_init_read_async(std::error_code& ec, Want& want) noexcept; // void do_init_write_async(std::error_code& ec, Want& want) noexcept; // // std::size_t do_read_some_sync(char* buffer, std::size_t size, // std::error_code& ec) noexcept; // std::size_t do_write_some_sync(const char* data, std::size_t size, // std::error_code& ec) noexcept; // std::size_t do_read_some_async(char* buffer, std::size_t size, // std::error_code& ec, Want& want) noexcept; // std::size_t do_write_some_async(const char* data, std::size_t size, // std::error_code& ec, Want& want) noexcept; // // If an error occurs during any of these 6 functions, the `ec` argument must be // set accordingly. Otherwise the `ec` argument must be set to // `std::error_code()`. // // The do_init_*_async() functions must update the `want` argument to indicate // how the operation must be initiated: // // Want::read Wait for read readiness, then call do_*_some_async(). // Want::write Wait for write readiness, then call do_*_some_async(). // Want::nothing Call do_*_some_async() immediately without waiting for // read or write readiness. // // If end-of-input occurs while reading, do_read_some_*() must fail, set `ec` to // MiscExtErrors::end_of_input, and return zero. // // If an error occurs during reading or writing, do_*_some_sync() must set `ec` // accordingly (to something other than `std::system_error()`) and return // zero. Otherwise they must set `ec` to `std::system_error()` and return the // number of bytes read or written, which **must** be at least 1. If the // underlying socket is in nonblocking mode, and no bytes could be immediately // read or written, these functions must fail with // `error::resource_unavailable_try_again`. // // If an error occurs during reading or writing, do_*_some_async() must set `ec` // accordingly (to something other than `std::system_error()`), `want` to // `Want::nothing`, and return zero. Otherwise they must set `ec` to // `std::system_error()` and return the number of bytes read or written, which // must be zero if no bytes could be immediately read or written. Note, in this // case it is not an error if the underlying socket is in nonblocking mode, and // no bytes could be immediately read or written. When these functions succeed, // but return zero because no bytes could be immediately read or written, they // must set `want` to something other than `Want::nothing`. // // If no error occurs, do_*_some_async() must set `want` to indicate how the // operation should proceed if additional data needs to be read or written, or // if no bytes were transferred: // // Want::read Wait for read readiness, then call do_*_some_async() again. // Want::write Wait for write readiness, then call do_*_some_async() again. // Want::nothing Call do_*_some_async() again without waiting for read or // write readiness. // // NOTE: If, for example, do_read_some_async() sets `want` to `Want::write`, it // means that the stream needs to write data to the underlying TCP socket before // it is able to deliver any additional data to the caller. While such a // situation will never occur on a raw TCP socket, it can occur on an SSL stream // (Secure Socket Layer). // // When do_*_some_async() returns `n`, at least one of the following conditions // must be true: // // n > 0 Bytes were transferred. // ec != std::error_code() An error occured. // want != Want::nothing Wait for read/write readiness. // // This is of critical importance, as it is the only way we can avoid falling // into a busy loop of repeated invocations of do_*_some_async(). // // NOTE: do_*_some_async() are allowed to set `want` to `Want::read` or // `Want::write`, even when they succesfully transfer a nonzero number of bytes. template <class S> class Service::BasicStreamOps { public: class StreamOper; class ReadOperBase; class WriteOperBase; class BufferedReadOperBase; template <class H> class ReadOper; template <class H> class WriteOper; template <class H> class BufferedReadOper; using LendersReadOperPtr = std::unique_ptr<ReadOperBase, LendersOperDeleter>; using LendersWriteOperPtr = std::unique_ptr<WriteOperBase, LendersOperDeleter>; using LendersBufferedReadOperPtr = std::unique_ptr<BufferedReadOperBase, LendersOperDeleter>; // Synchronous read static std::size_t read(S& stream, char* buffer, std::size_t size, std::error_code& ec) { REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use()); stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws char* begin = buffer; char* end = buffer + size; char* curr = begin; for (;;) { if (curr == end) { ec = std::error_code(); // Success break; } char* buffer_2 = curr; std::size_t size_2 = std::size_t(end - curr); std::size_t n = stream.do_read_some_sync(buffer_2, size_2, ec); if (REALM_UNLIKELY(ec)) break; REALM_ASSERT(n > 0); REALM_ASSERT(n <= size_2); curr += n; } std::size_t n = std::size_t(curr - begin); return n; } // Synchronous write static std::size_t write(S& stream, const char* data, std::size_t size, std::error_code& ec) { REALM_ASSERT(!stream.lowest_layer().m_write_oper || !stream.lowest_layer().m_write_oper->in_use()); stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws const char* begin = data; const char* end = data + size; const char* curr = begin; for (;;) { if (curr == end) { ec = std::error_code(); // Success break; } const char* data_2 = curr; std::size_t size_2 = std::size_t(end - curr); std::size_t n = stream.do_write_some_sync(data_2, size_2, ec); if (REALM_UNLIKELY(ec)) break; REALM_ASSERT(n > 0); REALM_ASSERT(n <= size_2); curr += n; } std::size_t n = std::size_t(curr - begin); return n; } // Synchronous read static std::size_t buffered_read(S& stream, char* buffer, std::size_t size, int delim, ReadAheadBuffer& rab, std::error_code& ec) { REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use()); stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws char* begin = buffer; char* end = buffer + size; char* curr = begin; for (;;) { bool complete = rab.read(curr, end, delim, ec); if (complete) break; rab.refill_sync(stream, ec); if (REALM_UNLIKELY(ec)) break; } std::size_t n = (curr - begin); return n; } // Synchronous read static std::size_t read_some(S& stream, char* buffer, std::size_t size, std::error_code& ec) { REALM_ASSERT(!stream.lowest_layer().m_read_oper || !stream.lowest_layer().m_read_oper->in_use()); stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws return stream.do_read_some_sync(buffer, size, ec); } // Synchronous write static std::size_t write_some(S& stream, const char* data, std::size_t size, std::error_code& ec) { REALM_ASSERT(!stream.lowest_layer().m_write_oper || !stream.lowest_layer().m_write_oper->in_use()); stream.lowest_layer().m_desc.ensure_blocking_mode(); // Throws return stream.do_write_some_sync(data, size, ec); } template <class H> static void async_read(S& stream, char* buffer, std::size_t size, bool is_read_some, H&& handler) { char* begin = buffer; char* end = buffer + size; LendersReadOperPtr op = Service::alloc<ReadOper<H>>(stream.lowest_layer().m_read_oper, stream, is_read_some, begin, end, std::move(handler)); // Throws stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws } template <class H> static void async_write(S& stream, const char* data, std::size_t size, bool is_write_some, H&& handler) { const char* begin = data; const char* end = data + size; LendersWriteOperPtr op = Service::alloc<WriteOper<H>>( stream.lowest_layer().m_write_oper, stream, is_write_some, begin, end, std::move(handler)); // Throws stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws } template <class H> static void async_buffered_read(S& stream, char* buffer, std::size_t size, int delim, ReadAheadBuffer& rab, H&& handler) { char* begin = buffer; char* end = buffer + size; LendersBufferedReadOperPtr op = Service::alloc<BufferedReadOper<H>>(stream.lowest_layer().m_read_oper, stream, begin, end, delim, rab, std::move(handler)); // Throws stream.lowest_layer().m_desc.initiate_oper(std::move(op)); // Throws } }; template <class S> class Service::BasicStreamOps<S>::StreamOper : public IoOper { public: StreamOper(std::size_t size, S& stream) noexcept : IoOper{size} , m_stream{&stream} { } void recycle() noexcept override final { bool orphaned = !m_stream; REALM_ASSERT(orphaned); // Note: do_recycle() commits suicide. do_recycle(orphaned); } void orphan() noexcept override final { m_stream = nullptr; } Descriptor& descriptor() noexcept override final { return m_stream->lowest_layer().m_desc; } protected: S* m_stream; std::error_code m_error_code; }; template <class S> class Service::BasicStreamOps<S>::ReadOperBase : public StreamOper { public: ReadOperBase(std::size_t size, S& stream, bool is_read_some, char* begin, char* end) noexcept : StreamOper{size, stream} , m_is_read_some{is_read_some} , m_begin{begin} , m_end{end} { } Want initiate() { auto& s = *this; REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get()); REALM_ASSERT(!s.is_complete()); REALM_ASSERT(s.m_curr <= s.m_end); Want want = Want::nothing; if (REALM_UNLIKELY(s.m_curr == s.m_end)) { s.set_is_complete(true); // Success } else { s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws s.m_stream->do_init_read_async(s.m_error_code, want); if (want == Want::nothing) { if (REALM_UNLIKELY(s.m_error_code)) { s.set_is_complete(true); // Failure } else { want = advance(); } } } return want; } Want advance() noexcept override final { auto& s = *this; REALM_ASSERT(!s.is_complete()); REALM_ASSERT(!s.is_canceled()); REALM_ASSERT(!s.m_error_code); REALM_ASSERT(s.m_curr < s.m_end); REALM_ASSERT(!s.m_is_read_some || s.m_curr == m_begin); for (;;) { // Read into callers buffer char* buffer = s.m_curr; std::size_t size = std::size_t(s.m_end - s.m_curr); Want want = Want::nothing; std::size_t n = s.m_stream->do_read_some_async(buffer, size, s.m_error_code, want); REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please // Any errors reported by do_read_some_async() (other than end_of_input) should always return 0 bool got_nothing = (n == 0); if (got_nothing) { if (REALM_UNLIKELY(s.m_error_code)) { s.set_is_complete(true); // Failure return Want::nothing; } // Got nothing, but want something return want; } REALM_ASSERT(!s.m_error_code); // Check for completion REALM_ASSERT(n <= size); s.m_curr += n; if (s.m_is_read_some || s.m_curr == s.m_end) { s.set_is_complete(true); // Success return Want::nothing; } if (want != Want::nothing) return want; REALM_ASSERT(n < size); } } protected: const bool m_is_read_some; char* const m_begin; // May be dangling after cancellation char* const m_end; // May be dangling after cancellation char* m_curr = m_begin; // May be dangling after cancellation }; template <class S> class Service::BasicStreamOps<S>::WriteOperBase : public StreamOper { public: WriteOperBase(std::size_t size, S& stream, bool is_write_some, const char* begin, const char* end) noexcept : StreamOper{size, stream} , m_is_write_some{is_write_some} , m_begin{begin} , m_end{end} { } Want initiate() { auto& s = *this; REALM_ASSERT(this == s.m_stream->lowest_layer().m_write_oper.get()); REALM_ASSERT(!s.is_complete()); REALM_ASSERT(s.m_curr <= s.m_end); Want want = Want::nothing; if (REALM_UNLIKELY(s.m_curr == s.m_end)) { s.set_is_complete(true); // Success } else { s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws s.m_stream->do_init_write_async(s.m_error_code, want); if (want == Want::nothing) { if (REALM_UNLIKELY(s.m_error_code)) { s.set_is_complete(true); // Failure } else { want = advance(); } } } return want; } Want advance() noexcept override final { auto& s = *this; REALM_ASSERT(!s.is_complete()); REALM_ASSERT(!s.is_canceled()); REALM_ASSERT(!s.m_error_code); REALM_ASSERT(s.m_curr < s.m_end); REALM_ASSERT(!s.m_is_write_some || s.m_curr == s.m_begin); for (;;) { // Write from callers buffer const char* data = s.m_curr; std::size_t size = std::size_t(s.m_end - s.m_curr); Want want = Want::nothing; std::size_t n = s.m_stream->do_write_some_async(data, size, s.m_error_code, want); REALM_ASSERT(n > 0 || s.m_error_code || want != Want::nothing); // No busy loop, please bool wrote_nothing = (n == 0); if (wrote_nothing) { if (REALM_UNLIKELY(s.m_error_code)) { s.set_is_complete(true); // Failure return Want::nothing; } // Wrote nothing, but want something written return want; } REALM_ASSERT(!s.m_error_code); // Check for completion REALM_ASSERT(n <= size); s.m_curr += n; if (s.m_is_write_some || s.m_curr == s.m_end) { s.set_is_complete(true); // Success return Want::nothing; } if (want != Want::nothing) return want; REALM_ASSERT(n < size); } } protected: const bool m_is_write_some; const char* const m_begin; // May be dangling after cancellation const char* const m_end; // May be dangling after cancellation const char* m_curr = m_begin; // May be dangling after cancellation }; template <class S> class Service::BasicStreamOps<S>::BufferedReadOperBase : public StreamOper { public: BufferedReadOperBase(std::size_t size, S& stream, char* begin, char* end, int delim, ReadAheadBuffer& rab) noexcept : StreamOper{size, stream} , m_read_ahead_buffer{rab} , m_begin{begin} , m_end{end} , m_delim{delim} { } Want initiate() { auto& s = *this; REALM_ASSERT(this == s.m_stream->lowest_layer().m_read_oper.get()); REALM_ASSERT(!s.is_complete()); Want want = Want::nothing; bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code); if (complete) { s.set_is_complete(true); // Success or failure } else { s.m_stream->lowest_layer().m_desc.ensure_nonblocking_mode(); // Throws s.m_stream->do_init_read_async(s.m_error_code, want); if (want == Want::nothing) { if (REALM_UNLIKELY(s.m_error_code)) { s.set_is_complete(true); // Failure } else { want = advance(); } } } return want; } Want advance() noexcept override final { auto& s = *this; REALM_ASSERT(!s.is_complete()); REALM_ASSERT(!s.is_canceled()); REALM_ASSERT(!s.m_error_code); REALM_ASSERT(s.m_read_ahead_buffer.empty()); REALM_ASSERT(s.m_curr < s.m_end); for (;;) { // Fill read-ahead buffer from stream (is empty now) Want want = Want::nothing; bool nonempty = s.m_read_ahead_buffer.refill_async(*s.m_stream, s.m_error_code, want); REALM_ASSERT(nonempty || s.m_error_code || want != Want::nothing); // No busy loop, please bool got_nothing = !nonempty; if (got_nothing) { if (REALM_UNLIKELY(s.m_error_code)) { s.set_is_complete(true); // Failure return Want::nothing; } // Got nothing, but want something return want; } // Transfer buffered data to callers buffer bool complete = s.m_read_ahead_buffer.read(s.m_curr, s.m_end, s.m_delim, s.m_error_code); if (complete || s.m_error_code == util::MiscExtErrors::end_of_input) { s.set_is_complete(true); // Success or failure (delim_not_found or end_of_input) return Want::nothing; } if (want != Want::nothing) return want; } } protected: ReadAheadBuffer& m_read_ahead_buffer; // May be dangling after cancellation char* const m_begin; // May be dangling after cancellation char* const m_end; // May be dangling after cancellation char* m_curr = m_begin; // May be dangling after cancellation const int m_delim; }; template <class S> template <class H> class Service::BasicStreamOps<S>::ReadOper : public ReadOperBase { public: ReadOper(std::size_t size, S& stream, bool is_read_some, char* begin, char* end, H&& handler) : ReadOperBase{size, stream, is_read_some, begin, end} , m_handler{std::move(handler)} { } void recycle_and_execute() override final { auto& s = *this; REALM_ASSERT(s.is_complete() || s.is_canceled()); REALM_ASSERT(s.is_complete() == (s.m_error_code || s.m_curr == s.m_end || (s.m_is_read_some && s.m_curr != s.m_begin))); REALM_ASSERT(s.m_curr >= s.m_begin); bool orphaned = !s.m_stream; std::error_code ec = s.m_error_code; if (s.is_canceled()) ec = util::error::operation_aborted; std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin); // Note: do_recycle_and_execute() commits suicide. s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec, num_bytes_transferred); // Throws } private: H m_handler; }; template <class S> template <class H> class Service::BasicStreamOps<S>::WriteOper : public WriteOperBase { public: WriteOper(std::size_t size, S& stream, bool is_write_some, const char* begin, const char* end, H&& handler) : WriteOperBase{size, stream, is_write_some, begin, end} , m_handler{std::move(handler)} { } void recycle_and_execute() override final { auto& s = *this; REALM_ASSERT(s.is_complete() || s.is_canceled()); REALM_ASSERT(s.is_complete() == (s.m_error_code || s.m_curr == s.m_end || (s.m_is_write_some && s.m_curr != s.m_begin))); REALM_ASSERT(s.m_curr >= s.m_begin); bool orphaned = !s.m_stream; std::error_code ec = s.m_error_code; if (s.is_canceled()) ec = util::error::operation_aborted; std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin); // Note: do_recycle_and_execute() commits suicide. s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec, num_bytes_transferred); // Throws } private: H m_handler; }; template <class S> template <class H> class Service::BasicStreamOps<S>::BufferedReadOper : public BufferedReadOperBase { public: BufferedReadOper(std::size_t size, S& stream, char* begin, char* end, int delim, ReadAheadBuffer& rab, H&& handler) : BufferedReadOperBase{size, stream, begin, end, delim, rab} , m_handler{std::move(handler)} { } void recycle_and_execute() override final { auto& s = *this; REALM_ASSERT(s.is_complete() || (s.is_canceled() && !s.m_error_code)); REALM_ASSERT(s.is_canceled() || s.m_error_code || (s.m_delim != std::char_traits<char>::eof() ? s.m_curr > s.m_begin && s.m_curr[-1] == std::char_traits<char>::to_char_type(s.m_delim) : s.m_curr == s.m_end)); REALM_ASSERT(s.m_curr >= s.m_begin); bool orphaned = !s.m_stream; std::error_code ec = s.m_error_code; if (s.is_canceled()) ec = util::error::operation_aborted; std::size_t num_bytes_transferred = std::size_t(s.m_curr - s.m_begin); // Note: do_recycle_and_execute() commits suicide. s.template do_recycle_and_execute<H>(orphaned, s.m_handler, ec, num_bytes_transferred); // Throws } private: H m_handler; }; template <class H> inline void Service::post(H handler) { do_post(&Service::post_oper_constr<H>, sizeof(PostOper<H>), &handler); } inline void Service::OwnersOperDeleter::operator()(AsyncOper* op) const noexcept { if (op->in_use()) { op->orphan(); } else { void* addr = op; op->~AsyncOper(); delete[] static_cast<char*>(addr); } } inline void Service::LendersOperDeleter::operator()(AsyncOper* op) const noexcept { op->recycle(); // Suicide } template <class Oper, class... Args> std::unique_ptr<Oper, Service::LendersOperDeleter> Service::alloc(OwnersOperPtr& owners_ptr, Args&&... args) { void* addr = owners_ptr.get(); std::size_t size; if (REALM_LIKELY(addr)) { REALM_ASSERT(!owners_ptr->in_use()); size = owners_ptr->m_size; // We can use static dispatch in the destructor call here, since an // object, that is not in use, is always an instance of UnusedOper. REALM_ASSERT(dynamic_cast<UnusedOper*>(owners_ptr.get())); static_cast<UnusedOper*>(owners_ptr.get())->UnusedOper::~UnusedOper(); if (REALM_UNLIKELY(size < sizeof(Oper))) { owners_ptr.release(); delete[] static_cast<char*>(addr); goto no_object; } } else { no_object: addr = new char[sizeof(Oper)]; // Throws size = sizeof(Oper); owners_ptr.reset(static_cast<AsyncOper*>(addr)); } std::unique_ptr<Oper, LendersOperDeleter> lenders_ptr; try { lenders_ptr.reset(new (addr) Oper(size, std::forward<Args>(args)...)); // Throws } catch (...) { new (addr) UnusedOper(size); // Does not throw throw; } return lenders_ptr; } template <class H> inline Service::PostOperBase* Service::post_oper_constr(void* addr, std::size_t size, Impl& service, void* cookie) { H& handler = *static_cast<H*>(cookie); return new (addr) PostOper<H>(size, service, std::move(handler)); // Throws } inline bool Service::AsyncOper::in_use() const noexcept { return m_in_use; } inline bool Service::AsyncOper::is_complete() const noexcept { return m_complete; } inline void Service::AsyncOper::cancel() noexcept { REALM_ASSERT(m_in_use); REALM_ASSERT(!m_canceled); m_canceled = true; } inline Service::AsyncOper::AsyncOper(std::size_t size, bool is_in_use) noexcept : m_size{size} , m_in_use{is_in_use} { } inline bool Service::AsyncOper::is_canceled() const noexcept { return m_canceled; } inline void Service::AsyncOper::set_is_complete(bool value) noexcept { REALM_ASSERT(!m_complete); REALM_ASSERT(!value || m_in_use); m_complete = value; } template <class H, class... Args> inline void Service::AsyncOper::do_recycle_and_execute(bool orphaned, H& handler, Args&&... args) { // Recycle the operation object before the handler is exceuted, such that // the memory is available for a new post operation that might be initiated // during the execution of the handler. bool was_recycled = false; // ScopeExit to ensure the AsyncOper object was reclaimed/deleted auto at_exit = util::ScopeExit([this, &was_recycled, &orphaned]() noexcept { if (!was_recycled) { do_recycle(orphaned); } }); // We need to copy or move all arguments to be passed to the handler, // such that there is no risk of references to the recycled operation // object being passed to the handler (the passed arguments may be // references to members of the recycled operation object). The easiest // way to achive this, is by forwarding the reference arguments (passed // to this function) to a helper function whose arguments have // nonreference type (`Args...` rather than `Args&&...`). // // Note that the copying and moving of arguments may throw, and it is // important that the operation is still recycled even if that // happens. For that reason, copying and moving of arguments must not // happen until we are in a scope (this scope) that catches and deals // correctly with such exceptions. do_recycle_and_execute_helper(orphaned, was_recycled, std::move(handler), std::forward<Args>(args)...); // Throws // Removed catch to prevent truncating the stack trace on exception } template <class H, class... Args> inline void Service::AsyncOper::do_recycle_and_execute_helper(bool orphaned, bool& was_recycled, H handler, Args... args) { do_recycle(orphaned); was_recycled = true; handler(std::move(args)...); // Throws } inline void Service::AsyncOper::do_recycle(bool orphaned) noexcept { REALM_ASSERT(in_use()); void* addr = this; std::size_t size = m_size; this->~AsyncOper(); // Suicide if (orphaned) { delete[] static_cast<char*>(addr); } else { new (addr) UnusedOper(size); } } // ---------------- Resolver ---------------- template <class H> class Resolver::ResolveOper : public Service::ResolveOperBase { public: ResolveOper(std::size_t size, Resolver& r, Query q, H&& handler) : ResolveOperBase{size, r, std::move(q)} , m_handler{std::move(handler)} { } void recycle_and_execute() override final { REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code)); REALM_ASSERT(is_canceled() || m_error_code || !m_endpoints.empty()); bool orphaned = !m_resolver; std::error_code ec = m_error_code; if (is_canceled()) ec = util::error::operation_aborted; // Note: do_recycle_and_execute() commits suicide. do_recycle_and_execute<H>(orphaned, m_handler, ec, std::move(m_endpoints)); // Throws } private: H m_handler; }; inline Resolver::Resolver(Service& service) : m_service_impl{*service.m_impl} { } inline Resolver::~Resolver() noexcept { cancel(); } inline Endpoint::List Resolver::resolve(const Query& q) { std::error_code ec; Endpoint::List list = resolve(q, ec); if (REALM_UNLIKELY(ec)) throw std::system_error(ec); return list; } template <class H> void Resolver::async_resolve(Query query, H&& handler) { Service::LendersResolveOperPtr op = Service::alloc<ResolveOper<H>>(m_resolve_oper, *this, std::move(query), std::move(handler)); // Throws initiate_oper(std::move(op)); // Throws } inline Resolver::Query::Query(std::string service_port, int init_flags) : m_flags{init_flags} , m_service{service_port} { } inline Resolver::Query::Query(const StreamProtocol& prot, std::string service_port, int init_flags) : m_flags{init_flags} , m_protocol{prot} , m_service{service_port} { } inline Resolver::Query::Query(std::string host_name, std::string service_port, int init_flags) : m_flags{init_flags} , m_host{host_name} , m_service{service_port} { } inline Resolver::Query::Query(const StreamProtocol& prot, std::string host_name, std::string service_port, int init_flags) : m_flags{init_flags} , m_protocol{prot} , m_host{host_name} , m_service{service_port} { } inline Resolver::Query::~Query() noexcept {} inline int Resolver::Query::flags() const { return m_flags; } inline StreamProtocol Resolver::Query::protocol() const { return m_protocol; } inline std::string Resolver::Query::host() const { return m_host; } inline std::string Resolver::Query::service() const { return m_service; } // ---------------- SocketBase ---------------- inline SocketBase::SocketBase(Service& service) : m_desc{*service.m_impl} { } inline SocketBase::~SocketBase() noexcept { close(); } inline bool SocketBase::is_open() const noexcept { return m_desc.is_open(); } inline auto SocketBase::native_handle() const noexcept -> native_handle_type { return m_desc.native_handle(); } inline void SocketBase::open(const StreamProtocol& prot) { std::error_code ec; if (open(prot, ec)) throw std::system_error(ec); } inline void SocketBase::close() noexcept { if (!is_open()) return; cancel(); m_desc.close(); } template <class O> inline void SocketBase::get_option(O& opt) const { std::error_code ec; if (get_option(opt, ec)) throw std::system_error(ec); } template <class O> inline std::error_code SocketBase::get_option(O& opt, std::error_code& ec) const { opt.get(*this, ec); return ec; } template <class O> inline void SocketBase::set_option(const O& opt) { std::error_code ec; if (set_option(opt, ec)) throw std::system_error(ec); } template <class O> inline std::error_code SocketBase::set_option(const O& opt, std::error_code& ec) { opt.set(*this, ec); return ec; } inline void SocketBase::bind(const Endpoint& ep) { std::error_code ec; if (bind(ep, ec)) throw std::system_error(ec); } inline Endpoint SocketBase::local_endpoint() const { std::error_code ec; Endpoint ep = local_endpoint(ec); if (ec) throw std::system_error(ec); return ep; } inline auto SocketBase::release_native_handle() noexcept -> native_handle_type { if (is_open()) { cancel(); return m_desc.release(); } return m_desc.native_handle(); } inline const StreamProtocol& SocketBase::get_protocol() const noexcept { return m_protocol; } template <class T, int opt, class U> inline SocketBase::Option<T, opt, U>::Option(T init_value) : m_value{init_value} { } template <class T, int opt, class U> inline T SocketBase::Option<T, opt, U>::value() const { return m_value; } template <class T, int opt, class U> inline void SocketBase::Option<T, opt, U>::get(const SocketBase& sock, std::error_code& ec) { union { U value; char strut[sizeof(U) + 1]; }; std::size_t value_size = sizeof strut; sock.get_option(opt_enum(opt), &value, value_size, ec); if (!ec) { REALM_ASSERT(value_size == sizeof value); m_value = T(value); } } template <class T, int opt, class U> inline void SocketBase::Option<T, opt, U>::set(SocketBase& sock, std::error_code& ec) const { U value_to_set = U(m_value); sock.set_option(opt_enum(opt), &value_to_set, sizeof value_to_set, ec); } // ---------------- Socket ---------------- class Socket::ConnectOperBase : public Service::IoOper { public: ConnectOperBase(std::size_t size, Socket& sock) noexcept : IoOper{size} , m_socket{&sock} { } Want initiate(const Endpoint& ep) { REALM_ASSERT(this == m_socket->m_write_oper.get()); if (m_socket->initiate_async_connect(ep, m_error_code)) { // Throws set_is_complete(true); // Failure, or immediate completion return Want::nothing; } return Want::write; } Want advance() noexcept override final { REALM_ASSERT(!is_complete()); REALM_ASSERT(!is_canceled()); REALM_ASSERT(!m_error_code); m_socket->finalize_async_connect(m_error_code); set_is_complete(true); return Want::nothing; } void recycle() noexcept override final { bool orphaned = !m_socket; REALM_ASSERT(orphaned); // Note: do_recycle() commits suicide. do_recycle(orphaned); } void orphan() noexcept override final { m_socket = nullptr; } Service::Descriptor& descriptor() noexcept override final { return m_socket->m_desc; } protected: Socket* m_socket; std::error_code m_error_code; }; template <class H> class Socket::ConnectOper : public ConnectOperBase { public: ConnectOper(std::size_t size, Socket& sock, H&& handler) : ConnectOperBase{size, sock} , m_handler{std::move(handler)} { } void recycle_and_execute() override final { REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code)); bool orphaned = !m_socket; std::error_code ec = m_error_code; if (is_canceled()) ec = util::error::operation_aborted; // Note: do_recycle_and_execute() commits suicide. do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws } private: H m_handler; }; inline Socket::Socket(Service& service) : SocketBase{service} { } inline Socket::Socket(Service& service, const StreamProtocol& prot, native_handle_type native_socket) : SocketBase{service} { assign(prot, native_socket); // Throws } inline Socket::~Socket() noexcept {} inline void Socket::connect(const Endpoint& ep) { std::error_code ec; if (connect(ep, ec)) // Throws throw std::system_error(ec); } inline std::size_t Socket::read(char* buffer, std::size_t size) { std::error_code ec; read(buffer, size, ec); // Throws if (ec) throw std::system_error(ec); return size; } inline std::size_t Socket::read(char* buffer, std::size_t size, std::error_code& ec) { return StreamOps::read(*this, buffer, size, ec); // Throws } inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab) { std::error_code ec; read(buffer, size, rab, ec); // Throws if (ec) throw std::system_error(ec); return size; } inline std::size_t Socket::read(char* buffer, std::size_t size, ReadAheadBuffer& rab, std::error_code& ec) { int delim = std::char_traits<char>::eof(); return StreamOps::buffered_read(*this, buffer, size, delim, rab, ec); // Throws } inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab) { std::error_code ec; std::size_t n = read_until(buffer, size, delim, rab, ec); // Throws if (ec) throw std::system_error(ec); return n; } inline std::size_t Socket::read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab, std::error_code& ec) { int delim_2 = std::char_traits<char>::to_int_type(delim); return StreamOps::buffered_read(*this, buffer, size, delim_2, rab, ec); // Throws } inline std::size_t Socket::write(const char* data, std::size_t size) { std::error_code ec; write(data, size, ec); // Throws if (ec) throw std::system_error(ec); return size; } inline std::size_t Socket::write(const char* data, std::size_t size, std::error_code& ec) { return StreamOps::write(*this, data, size, ec); // Throws } inline std::size_t Socket::read_some(char* buffer, std::size_t size) { std::error_code ec; std::size_t n = read_some(buffer, size, ec); // Throws if (ec) throw std::system_error(ec); return n; } inline std::size_t Socket::read_some(char* buffer, std::size_t size, std::error_code& ec) { return StreamOps::read_some(*this, buffer, size, ec); // Throws } inline std::size_t Socket::write_some(const char* data, std::size_t size) { std::error_code ec; std::size_t n = write_some(data, size, ec); // Throws if (ec) throw std::system_error(ec); return n; } inline std::size_t Socket::write_some(const char* data, std::size_t size, std::error_code& ec) { return StreamOps::write_some(*this, data, size, ec); // Throws } template <class H> inline void Socket::async_connect(const Endpoint& ep, H&& handler) { LendersConnectOperPtr op = Service::alloc<ConnectOper<H>>(m_write_oper, *this, std::move(handler)); // Throws m_desc.initiate_oper(std::move(op), ep); // Throws } template <class H> inline void Socket::async_read(char* buffer, std::size_t size, H&& handler) { bool is_read_some = false; StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws } template <class H> inline void Socket::async_read(char* buffer, std::size_t size, ReadAheadBuffer& rab, H&& handler) { int delim = std::char_traits<char>::eof(); StreamOps::async_buffered_read(*this, buffer, size, delim, rab, std::move(handler)); // Throws } template <class H> inline void Socket::async_read_until(char* buffer, std::size_t size, char delim, ReadAheadBuffer& rab, H&& handler) { int delim_2 = std::char_traits<char>::to_int_type(delim); StreamOps::async_buffered_read(*this, buffer, size, delim_2, rab, std::move(handler)); // Throws } template <class H> inline void Socket::async_write(const char* data, std::size_t size, H&& handler) { bool is_write_some = false; StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws } template <class H> inline void Socket::async_read_some(char* buffer, std::size_t size, H&& handler) { bool is_read_some = true; StreamOps::async_read(*this, buffer, size, is_read_some, std::move(handler)); // Throws } template <class H> inline void Socket::async_write_some(const char* data, std::size_t size, H&& handler) { bool is_write_some = true; StreamOps::async_write(*this, data, size, is_write_some, std::move(handler)); // Throws } inline void Socket::shutdown(shutdown_type what) { std::error_code ec; if (shutdown(what, ec)) // Throws throw std::system_error(ec); } inline void Socket::assign(const StreamProtocol& prot, native_handle_type native_socket) { std::error_code ec; if (assign(prot, native_socket, ec)) // Throws throw std::system_error(ec); } inline std::error_code Socket::assign(const StreamProtocol& prot, native_handle_type native_socket, std::error_code& ec) { return do_assign(prot, native_socket, ec); // Throws } inline Socket& Socket::lowest_layer() noexcept { return *this; } inline void Socket::do_init_read_async(std::error_code&, Want& want) noexcept { want = Want::read; // Wait for read readiness before proceeding } inline void Socket::do_init_write_async(std::error_code&, Want& want) noexcept { want = Want::write; // Wait for write readiness before proceeding } inline std::size_t Socket::do_read_some_sync(char* buffer, std::size_t size, std::error_code& ec) noexcept { return m_desc.read_some(buffer, size, ec); } inline std::size_t Socket::do_write_some_sync(const char* data, std::size_t size, std::error_code& ec) noexcept { return m_desc.write_some(data, size, ec); } inline std::size_t Socket::do_read_some_async(char* buffer, std::size_t size, std::error_code& ec, Want& want) noexcept { std::error_code ec_2; std::size_t n = m_desc.read_some(buffer, size, ec_2); bool success = (!ec_2 || ec_2 == util::error::resource_unavailable_try_again); if (REALM_UNLIKELY(!success)) { ec = ec_2; want = Want::nothing; // Failure return 0; } ec = std::error_code(); want = Want::read; // Success return n; } inline std::size_t Socket::do_write_some_async(const char* data, std::size_t size, std::error_code& ec, Want& want) noexcept { std::error_code ec_2; std::size_t n = m_desc.write_some(data, size, ec_2); bool success = (!ec_2 || ec_2 == util::error::resource_unavailable_try_again); if (REALM_UNLIKELY(!success)) { ec = ec_2; want = Want::nothing; // Failure return 0; } ec = std::error_code(); want = Want::write; // Success return n; } // ---------------- Acceptor ---------------- class Acceptor::AcceptOperBase : public Service::IoOper { public: AcceptOperBase(std::size_t size, Acceptor& a, Socket& s, Endpoint* e) : IoOper{size} , m_acceptor{&a} , m_socket{s} , m_endpoint{e} { } Want initiate() { REALM_ASSERT(this == m_acceptor->m_read_oper.get()); REALM_ASSERT(!is_complete()); m_acceptor->m_desc.ensure_nonblocking_mode(); // Throws return Want::read; } Want advance() noexcept override final { REALM_ASSERT(!is_complete()); REALM_ASSERT(!is_canceled()); REALM_ASSERT(!m_error_code); REALM_ASSERT(!m_socket.is_open()); Want want = m_acceptor->do_accept_async(m_socket, m_endpoint, m_error_code); if (want == Want::nothing) set_is_complete(true); // Success or failure return want; } void recycle() noexcept override final { bool orphaned = !m_acceptor; REALM_ASSERT(orphaned); // Note: do_recycle() commits suicide. do_recycle(orphaned); } void orphan() noexcept override final { m_acceptor = nullptr; } Service::Descriptor& descriptor() noexcept override final { return m_acceptor->m_desc; } protected: Acceptor* m_acceptor; Socket& m_socket; // May be dangling after cancellation Endpoint* const m_endpoint; // May be dangling after cancellation std::error_code m_error_code; }; template <class H> class Acceptor::AcceptOper : public AcceptOperBase { public: AcceptOper(std::size_t size, Acceptor& a, Socket& s, Endpoint* e, H&& handler) : AcceptOperBase{size, a, s, e} , m_handler{std::move(handler)} { } void recycle_and_execute() override final { REALM_ASSERT(is_complete() || (is_canceled() && !m_error_code)); REALM_ASSERT(is_canceled() || m_error_code || m_socket.is_open()); bool orphaned = !m_acceptor; std::error_code ec = m_error_code; if (is_canceled()) ec = util::error::operation_aborted; // Note: do_recycle_and_execute() commits suicide. do_recycle_and_execute<H>(orphaned, m_handler, ec); // Throws } private: H m_handler; }; inline Acceptor::Acceptor(Service& service) : SocketBase{service} { } inline Acceptor::~Acceptor() noexcept {} inline void Acceptor::listen(int backlog) { std::error_code ec; if (listen(backlog, ec)) // Throws throw std::system_error(ec); } inline void Acceptor::accept(Socket& sock) { std::error_code ec; if (accept(sock, ec)) // Throws throw std::system_error(ec); } inline void Acceptor::accept(Socket& sock, Endpoint& ep) { std::error_code ec; if (accept(sock, ep, ec)) // Throws throw std::system_error(ec); } inline std::error_code Acceptor::accept(Socket& sock, std::error_code& ec) { Endpoint* ep = nullptr; return accept(sock, ep, ec); // Throws } inline std::error_code Acceptor::accept(Socket& sock, Endpoint& ep, std::error_code& ec) { return accept(sock, &ep, ec); // Throws } template <class H> inline void Acceptor::async_accept(Socket& sock, H&& handler) { Endpoint* ep = nullptr; async_accept(sock, ep, std::move(handler)); // Throws } template <class H> inline void Acceptor::async_accept(Socket& sock, Endpoint& ep, H&& handler) { async_accept(sock, &ep, std::move(handler)); // Throws } inline std::error_code Acceptor::accept(Socket& socket, Endpoint* ep, std::error_code& ec) { REALM_ASSERT(!m_read_oper || !m_read_oper->in_use()); if (REALM_UNLIKELY(socket.is_open())) throw util::runtime_error("Socket is already open"); m_desc.ensure_blocking_mode(); // Throws m_desc.accept(socket.m_desc, m_protocol, ep, ec); return ec; } inline Acceptor::Want Acceptor::do_accept_async(Socket& socket, Endpoint* ep, std::error_code& ec) noexcept { std::error_code ec_2; m_desc.accept(socket.m_desc, m_protocol, ep, ec_2); if (ec_2 == util::error::resource_unavailable_try_again) return Want::read; ec = ec_2; return Want::nothing; } template <class H> inline void Acceptor::async_accept(Socket& sock, Endpoint* ep, H&& handler) { if (REALM_UNLIKELY(sock.is_open())) throw util::runtime_error("Socket is already open"); LendersAcceptOperPtr op = Service::alloc<AcceptOper<H>>(m_read_oper, *this, sock, ep, std::move(handler)); // Throws m_desc.initiate_oper(std::move(op)); // Throws } // ---------------- DeadlineTimer ---------------- template <class H> class DeadlineTimer::WaitOper : public Service::WaitOperBase { public: WaitOper(std::size_t size, DeadlineTimer& timer, clock::time_point expiration_time, H&& handler) : Service::WaitOperBase{size, timer, expiration_time} , m_handler{std::move(handler)} { } void recycle_and_execute() override final { bool orphaned = !m_timer; Status status = Status::OK(); if (is_canceled()) status = Status{ErrorCodes::OperationAborted, "Timer canceled"}; // Note: do_recycle_and_execute() commits suicide. do_recycle_and_execute<H>(orphaned, m_handler, status); // Throws } private: H m_handler; }; inline DeadlineTimer::DeadlineTimer(Service& service) : m_service_impl{*service.m_impl} { } inline DeadlineTimer::~DeadlineTimer() noexcept { cancel(); } template <class R, class P, class H> inline void DeadlineTimer::async_wait(std::chrono::duration<R, P> delay, H&& handler) { clock::time_point now = clock::now(); // FIXME: This method of detecting overflow does not work. Comparison // between distinct duration types is not overflow safe. Overflow easily // happens in the implied conversion of arguments to the common duration // type (std::common_type<>). auto max_add = clock::time_point::max() - now; if (delay > max_add) throw util::overflow_error("Expiration time overflow"); clock::time_point expiration_time = now + delay; initiate_oper(Service::alloc<WaitOper<H>>(m_wait_oper, *this, expiration_time, std::move(handler))); // Throws } // ---------------- ReadAheadBuffer ---------------- inline ReadAheadBuffer::ReadAheadBuffer() : m_buffer{new char[s_size]} // Throws { } inline void ReadAheadBuffer::clear() noexcept { m_begin = nullptr; m_end = nullptr; } inline bool ReadAheadBuffer::empty() const noexcept { return (m_begin == m_end); } template <class S> inline void ReadAheadBuffer::refill_sync(S& stream, std::error_code& ec) noexcept { char* buffer = m_buffer.get(); std::size_t size = s_size; static_assert(noexcept(stream.do_read_some_sync(buffer, size, ec)), ""); std::size_t n = stream.do_read_some_sync(buffer, size, ec); if (REALM_UNLIKELY(n == 0)) return; REALM_ASSERT(!ec); REALM_ASSERT(n <= size); m_begin = m_buffer.get(); m_end = m_begin + n; } template <class S> inline bool ReadAheadBuffer::refill_async(S& stream, std::error_code& ec, Want& want) noexcept { char* buffer = m_buffer.get(); std::size_t size = s_size; static_assert(noexcept(stream.do_read_some_async(buffer, size, ec, want)), ""); std::size_t n = stream.do_read_some_async(buffer, size, ec, want); // Any errors reported by do_read_some_async() (other than end_of_input) should always return 0 if (n == 0) return false; REALM_ASSERT(!ec || ec == util::MiscExtErrors::end_of_input); REALM_ASSERT(n <= size); m_begin = m_buffer.get(); m_end = m_begin + n; return true; } } // namespace realm::sync::network