src/corosio/src/local_connect_pair.cpp

62.5% Lines (25/40) 100.0% List of functions (5/5)
local_connect_pair.cpp
f(x) Functions (5)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/local_connect_pair.hpp>
11 #include <boost/corosio/detail/platform.hpp>
12 #include <boost/corosio/native/detail/make_err.hpp>
13
14 #include <system_error>
15
16 #if BOOST_COROSIO_POSIX
17 #include <fcntl.h>
18 #include <sys/socket.h>
19 #include <sys/un.h>
20 #include <unistd.h>
21 #elif BOOST_COROSIO_HAS_IOCP
22 #include <boost/corosio/native/detail/endpoint_convert.hpp>
23
24 #include <algorithm>
25 #include <cstring>
26 #include <filesystem>
27 #include <random>
28 #include <string>
29 #include <thread>
30
31 #ifndef WIN32_LEAN_AND_MEAN
32 #define WIN32_LEAN_AND_MEAN
33 #endif
34 #include <WinSock2.h>
35
36 #ifndef AF_UNIX
37 #define AF_UNIX 1
38 #endif
39 #endif
40
41 namespace boost::corosio {
42
43 namespace {
44
45 #if BOOST_COROSIO_POSIX
46
47 std::error_code
48 76x make_pair_fds(int type, int& a_fd, int& b_fd) noexcept
49 {
50 int fds[2];
51 76x if (::socketpair(AF_UNIX, type, 0, fds) != 0)
52 return detail::make_err(errno);
53
54 // assign() is documented "adopt-only" and will not mutate the fd;
55 // set O_NONBLOCK before transferring ownership.
56 228x for (int i = 0; i < 2; ++i)
57 {
58 152x int flags = ::fcntl(fds[i], F_GETFL, 0);
59 152x if (flags < 0 || ::fcntl(fds[i], F_SETFL, flags | O_NONBLOCK) < 0)
60 {
61 auto ec = detail::make_err(errno);
62 ::close(fds[0]);
63 ::close(fds[1]);
64 return ec;
65 }
66 }
67
68 76x a_fd = fds[0];
69 76x b_fd = fds[1];
70 76x return {};
71 }
72
73 template<class Socket>
74 std::error_code
75 76x assign_pair(Socket& a, Socket& b, int a_fd, int b_fd) noexcept
76 {
77 try
78 {
79 76x a.assign(a_fd);
80 }
81 catch (std::system_error const& e)
82 {
83 ::close(a_fd);
84 ::close(b_fd);
85 return e.code();
86 }
87
88 try
89 {
90 76x b.assign(b_fd);
91 }
92 catch (std::system_error const& e)
93 {
94 a.close();
95 ::close(b_fd);
96 return e.code();
97 }
98
99 76x return {};
100 }
101
102 #elif BOOST_COROSIO_HAS_IOCP
103
104 // Build a unique sub-directory under temp and return the full socket
105 // path inside it. Empty string on failure.
106 std::string
107 pick_pair_path(std::filesystem::path& dir_out)
108 {
109 namespace fs = std::filesystem;
110
111 thread_local std::mt19937_64 gen{std::random_device{}()};
112
113 for (int attempt = 0; attempt < 16; ++attempt)
114 {
115 auto candidate =
116 fs::temp_directory_path() /
117 ("co_pair_" + std::to_string(gen()));
118 std::error_code ec;
119 if (fs::create_directory(candidate, ec))
120 {
121 dir_out = candidate;
122 return (candidate / "s").string();
123 }
124 }
125 return {};
126 }
127
128 void
129 remove_pair_path(std::filesystem::path const& dir, std::string const& path)
130 {
131 std::error_code ec;
132 std::filesystem::remove(std::filesystem::path(path), ec);
133 std::filesystem::remove(dir, ec);
134 }
135
136 // Synchronously rendezvous two AF_UNIX SOCK_STREAM sockets. The
137 // listener and accept happen on the caller's thread; the connect
138 // runs on a short-lived worker to avoid a deadlock. The returned
139 // sockets are created with WSA_FLAG_OVERLAPPED so they can be
140 // registered with IOCP by assign_socket().
141 std::error_code
142 make_pair_sockets(SOCKET& a_sock, SOCKET& b_sock) noexcept
143 {
144 namespace fs = std::filesystem;
145
146 a_sock = INVALID_SOCKET;
147 b_sock = INVALID_SOCKET;
148
149 fs::path dir;
150 std::string path = pick_pair_path(dir);
151 if (path.empty())
152 return detail::make_err(ERROR_PATH_NOT_FOUND);
153
154 SOCKET listen_sock = ::WSASocketW(
155 AF_UNIX, SOCK_STREAM, 0, nullptr, 0, WSA_FLAG_OVERLAPPED);
156 if (listen_sock == INVALID_SOCKET)
157 {
158 auto ec = detail::make_err(::WSAGetLastError());
159 remove_pair_path(dir, path);
160 return ec;
161 }
162
163 detail::un_sa_t addr{};
164 addr.sun_family = AF_UNIX;
165 std::memcpy(
166 addr.sun_path, path.c_str(),
167 (std::min)(path.size(), sizeof(addr.sun_path) - 1));
168 int addr_len = static_cast<int>(
169 offsetof(detail::un_sa_t, sun_path) + path.size() + 1);
170
171 if (::bind(
172 listen_sock, reinterpret_cast<sockaddr*>(&addr), addr_len)
173 == SOCKET_ERROR)
174 {
175 auto ec = detail::make_err(::WSAGetLastError());
176 ::closesocket(listen_sock);
177 remove_pair_path(dir, path);
178 return ec;
179 }
180
181 if (::listen(listen_sock, 1) == SOCKET_ERROR)
182 {
183 auto ec = detail::make_err(::WSAGetLastError());
184 ::closesocket(listen_sock);
185 remove_pair_path(dir, path);
186 return ec;
187 }
188
189 SOCKET worker_sock = INVALID_SOCKET;
190 std::error_code worker_ec;
191
192 std::thread worker([&] {
193 worker_sock = ::WSASocketW(
194 AF_UNIX, SOCK_STREAM, 0, nullptr, 0, WSA_FLAG_OVERLAPPED);
195 if (worker_sock == INVALID_SOCKET)
196 {
197 worker_ec = detail::make_err(::WSAGetLastError());
198 return;
199 }
200
201 detail::un_sa_t caddr{};
202 caddr.sun_family = AF_UNIX;
203 std::memcpy(
204 caddr.sun_path, path.c_str(),
205 (std::min)(path.size(), sizeof(caddr.sun_path) - 1));
206 int caddr_len = static_cast<int>(
207 offsetof(detail::un_sa_t, sun_path) + path.size() + 1);
208
209 if (::connect(
210 worker_sock, reinterpret_cast<sockaddr*>(&caddr), caddr_len)
211 == SOCKET_ERROR)
212 {
213 worker_ec = detail::make_err(::WSAGetLastError());
214 ::closesocket(worker_sock);
215 worker_sock = INVALID_SOCKET;
216 }
217 });
218
219 SOCKET accept_sock = ::accept(listen_sock, nullptr, nullptr);
220 std::error_code accept_ec;
221 if (accept_sock == INVALID_SOCKET)
222 accept_ec = detail::make_err(::WSAGetLastError());
223
224 worker.join();
225
226 ::closesocket(listen_sock);
227 remove_pair_path(dir, path);
228
229 if (accept_ec)
230 {
231 if (worker_sock != INVALID_SOCKET)
232 ::closesocket(worker_sock);
233 return accept_ec;
234 }
235 if (worker_ec)
236 {
237 ::closesocket(accept_sock);
238 return worker_ec;
239 }
240
241 a_sock = accept_sock;
242 b_sock = worker_sock;
243 return {};
244 }
245
246 std::error_code
247 assign_pair(
248 local_stream_socket& a,
249 local_stream_socket& b,
250 SOCKET a_sock,
251 SOCKET b_sock) noexcept
252 {
253 try
254 {
255 a.assign(static_cast<native_handle_type>(a_sock));
256 }
257 catch (std::system_error const& e)
258 {
259 ::closesocket(a_sock);
260 ::closesocket(b_sock);
261 return e.code();
262 }
263
264 try
265 {
266 b.assign(static_cast<native_handle_type>(b_sock));
267 }
268 catch (std::system_error const& e)
269 {
270 a.close();
271 ::closesocket(b_sock);
272 return e.code();
273 }
274
275 return {};
276 }
277
278 #endif
279
280 } // namespace
281
282 std::error_code
283 32x connect_pair(local_stream_socket& a, local_stream_socket& b) noexcept
284 {
285 32x if (a.is_open() || b.is_open())
286 2x return detail::make_err(
287 #if BOOST_COROSIO_POSIX
288 EISCONN
289 #else
290 WSAEISCONN
291 #endif
292 2x );
293
294 #if BOOST_COROSIO_POSIX
295 30x int a_fd = -1, b_fd = -1;
296 30x if (auto ec = make_pair_fds(SOCK_STREAM, a_fd, b_fd))
297 return ec;
298 30x return assign_pair(a, b, a_fd, b_fd);
299 #elif BOOST_COROSIO_HAS_IOCP
300 SOCKET a_sock = INVALID_SOCKET, b_sock = INVALID_SOCKET;
301 if (auto ec = make_pair_sockets(a_sock, b_sock))
302 return ec;
303 return assign_pair(a, b, a_sock, b_sock);
304 #else
305 return detail::make_err(ENOSYS);
306 #endif
307 }
308
309 #if BOOST_COROSIO_POSIX
310
311 std::error_code
312 48x connect_pair(local_datagram_socket& a, local_datagram_socket& b) noexcept
313 {
314 48x if (a.is_open() || b.is_open())
315 2x return detail::make_err(EISCONN);
316
317 46x int a_fd = -1, b_fd = -1;
318 46x if (auto ec = make_pair_fds(SOCK_DGRAM, a_fd, b_fd))
319 return ec;
320 46x return assign_pair(a, b, a_fd, b_fd);
321 }
322
323 #endif
324
325 } // namespace boost::corosio
326