Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/beast2
8 : //
9 :
10 : #ifndef BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
11 : #define BOOST_BEAST2_SERVER_HTTP_STREAM_HPP
12 :
13 : #include <boost/beast2/detail/config.hpp>
14 : #include <boost/beast2/log_service.hpp>
15 : #include <boost/beast2/error.hpp>
16 : #include <boost/beast2/format.hpp>
17 : #include <boost/beast2/read.hpp>
18 : #include <boost/beast2/server/any_lambda.hpp>
19 : #include <boost/beast2/server/route_handler_asio.hpp>
20 : #include <boost/beast2/server/router_asio.hpp>
21 : #include <boost/beast2/wrap_executor.hpp>
22 : #include <boost/beast2/write.hpp>
23 : #include <boost/beast2/detail/except.hpp>
24 : #include <boost/capy/application.hpp>
25 : #include <boost/http_proto/request_parser.hpp>
26 : #include <boost/http_proto/response.hpp>
27 : #include <boost/http_proto/serializer.hpp>
28 : #include <boost/http_proto/string_body.hpp>
29 : #include <boost/http_proto/server/basic_router.hpp>
30 : #include <boost/url/parse.hpp>
31 : #include <boost/asio/prepend.hpp>
32 :
33 : namespace boost {
34 : namespace beast2 {
35 :
36 : //------------------------------------------------
37 :
38 : /** An HTTP server stream which routes requests to handlers and sends responses.
39 :
40 : An object of this type wraps an asynchronous Boost.ASIO stream and implements
41 : a high level server connection which reads HTTP requests, routes them to
42 : handlers installed in a router, and sends the HTTP response.
43 :
44 : @par Requires
45 : `AsyncStream` must satisfy <em>AsyncReadStream</em> and <em>AsyncWriteStream</em>
46 :
47 : @tparam AsyncStream The type of asynchronous stream.
48 : */
49 : template<class AsyncStream>
50 : class http_stream
51 : : private http::suspender::owner
52 : {
53 : public:
54 : /** Constructor.
55 :
56 : This initializes a new HTTP connection object that operates on
57 : the given stream, uses the specified router to dispatch incoming
58 : requests, and calls the supplied completion function when the
59 : connection closes or fails.
60 :
61 : Construction does not start any I/O; call @ref on_stream_begin when
62 : the stream is connected to the remote peer to begin reading
63 : requests and processing them.
64 :
65 : @param app The owning application, used to access shared services
66 : such as logging and protocol objects.
67 : @param stream The underlying asynchronous stream to read from
68 : and write to. The caller is responsible for maintaining its
69 : lifetime for the duration of the session.
70 : @param routes The router used to dispatch incoming HTTP requests.
71 : @param close_fn The function invoked when the connection is closed
72 : or an unrecoverable error occurs.
73 : */
74 : http_stream(
75 : capy::application& app,
76 : AsyncStream& stream,
77 : router_asio<AsyncStream&> routes,
78 : any_lambda<void(system::error_code)> close_fn);
79 :
80 : /** Called to start a new HTTP session
81 :
82 : The stream must be in a connected,
83 : correct state for a new session.
84 : */
85 : void on_stream_begin(http::acceptor_config const& config);
86 :
87 : private:
88 : void do_read();
89 : void on_read(
90 : system::error_code ec,
91 : std::size_t bytes_transferred);
92 : void on_headers();
93 : void do_dispatch(http::route_result rv = {});
94 : void do_read_body();
95 : void on_read_body(
96 : system::error_code ec,
97 : std::size_t bytes_transferred);
98 : void do_respond(http::route_result rv);
99 : void do_write();
100 : void on_write(
101 : system::error_code const& ec,
102 : std::size_t bytes_transferred);
103 : void on_complete();
104 : http::resumer do_suspend() override;
105 : void do_resume(http::route_result const& rv) override;
106 : void do_resume(std::exception_ptr ep) override;
107 : void do_close();
108 : void do_fail(core::string_view s,
109 : system::error_code const& ec);
110 : void clear() noexcept;
111 :
112 : protected:
113 0 : std::string id() const
114 : {
115 0 : return std::string("[") + std::to_string(id_) + "] ";
116 : }
117 :
118 : protected:
119 : struct resetter;
120 : section sect_;
121 : std::size_t id_ = 0;
122 : AsyncStream& stream_;
123 : router_asio<AsyncStream&> routes_;
124 : any_lambda<void(system::error_code)> close_;
125 : http::acceptor_config const* pconfig_ = nullptr;
126 :
127 : using work_guard = asio::executor_work_guard<decltype(
128 : std::declval<AsyncStream&>().get_executor())>;
129 : std::unique_ptr<work_guard> pwg_;
130 : asio_route_params<AsyncStream&> rp_;
131 : };
132 :
133 : //------------------------------------------------
134 :
135 : // for exception safety
136 : template<class AsyncStream>
137 : struct http_stream<AsyncStream>::
138 : resetter
139 : {
140 : ~resetter()
141 : {
142 : if(clear_)
143 : owner_.clear();
144 : }
145 :
146 : explicit resetter(
147 : http_stream<AsyncStream>& owner) noexcept
148 : : owner_(owner)
149 : {
150 : }
151 :
152 : void accept()
153 : {
154 : clear_ = false;
155 : }
156 :
157 : private:
158 : http_stream<AsyncStream>& owner_;
159 : bool clear_ = true;
160 : };
161 :
162 : //------------------------------------------------
163 :
164 : template<class AsyncStream>
165 0 : http_stream<AsyncStream>::
166 : http_stream(
167 : capy::application& app,
168 : AsyncStream& stream,
169 : router_asio<AsyncStream&> routes,
170 : any_lambda<void(system::error_code)> close)
171 0 : : sect_(use_log_service(app).get_section("http_stream"))
172 0 : , id_(
173 0 : []() noexcept
174 : {
175 : static std::size_t n = 0;
176 0 : return ++n;
177 0 : }())
178 0 : , stream_(stream)
179 0 : , routes_(std::move(routes))
180 0 : , close_(close)
181 0 : , rp_(stream_)
182 : {
183 0 : rp_.parser = http::request_parser(app);
184 :
185 0 : rp_.serializer = http::serializer(app);
186 0 : rp_.suspend = http::suspender(*this);
187 0 : rp_.ex = wrap_executor(stream_.get_executor());
188 0 : }
189 :
190 : // called to start a new HTTP session.
191 : // the connection must be in the correct state already.
192 : template<class AsyncStream>
193 : void
194 0 : http_stream<AsyncStream>::
195 : on_stream_begin(
196 : http::acceptor_config const& config)
197 : {
198 0 : pconfig_ = &config;
199 :
200 0 : rp_.parser.reset();
201 0 : rp_.session_data.clear();
202 0 : do_read();
203 0 : }
204 :
205 : // begin reading the request
206 : template<class AsyncStream>
207 : void
208 0 : http_stream<AsyncStream>::
209 : do_read()
210 : {
211 0 : rp_.parser.start();
212 :
213 0 : beast2::async_read_some(
214 : stream_,
215 : rp_.parser,
216 0 : call_mf(&http_stream::on_read, this));
217 0 : }
218 :
219 : // called when the read operation completes
220 : template<class AsyncStream>
221 : void
222 0 : http_stream<AsyncStream>::
223 : on_read(
224 : system::error_code ec,
225 : std::size_t bytes_transferred)
226 : {
227 : (void)bytes_transferred;
228 :
229 0 : if(ec.failed())
230 0 : return do_fail("http_stream::on_read", ec);
231 :
232 0 : LOG_TRC(this->sect_)(
233 : "{} http_stream::on_read bytes={}",
234 : this->id(), bytes_transferred);
235 :
236 0 : on_headers();
237 : }
238 :
239 : // called to set up the response after reading the request
240 : template<class AsyncStream>
241 : void
242 0 : http_stream<AsyncStream>::
243 : on_headers()
244 : {
245 : // set up Request and Response objects
246 : // VFALCO HACK for now we make a copy of the message
247 0 : rp_.req = rp_.parser.get();
248 0 : rp_.route_data.clear();
249 0 : rp_.res.set_start_line( // VFALCO WTF
250 : http::status::ok, rp_.req.version());
251 0 : rp_.res.set_keep_alive(rp_.req.keep_alive());
252 0 : rp_.serializer.reset();
253 :
254 : // parse the URL
255 : {
256 0 : auto rv = urls::parse_uri_reference(rp_.req.target());
257 0 : if(rv.has_error())
258 : {
259 : // error parsing URL
260 0 : rp_.status(http::status::bad_request);
261 0 : rp_.set_body("Bad Request: " + rv.error().message());
262 0 : return do_respond(rv.error());
263 : }
264 :
265 0 : rp_.url = rv.value();
266 : }
267 :
268 : // invoke handlers for the route
269 0 : do_dispatch();
270 : }
271 :
272 : // called to dispatch or resume the route
273 : template<class AsyncStream>
274 : void
275 0 : http_stream<AsyncStream>::
276 : do_dispatch(
277 : http::route_result rv)
278 : {
279 0 : if(! rv.failed())
280 : {
281 0 : BOOST_ASSERT(! pwg_); // can't be suspended
282 0 : rv = routes_.dispatch(
283 0 : rp_.req.method(), rp_.url, rp_);
284 : }
285 : else
286 : {
287 0 : rv = routes_.resume(rp_, rv);
288 : }
289 :
290 0 : do_respond(rv);
291 0 : }
292 :
293 : // finish reading the body
294 : template<class AsyncStream>
295 : void
296 0 : http_stream<AsyncStream>::
297 : do_read_body()
298 : {
299 0 : beast2::async_read(
300 : stream_,
301 : rp_.parser,
302 0 : call_mf(&http_stream::on_read_body, this));
303 0 : }
304 :
305 : // called repeatedly when reading the body
306 : template<class AsyncStream>
307 : void
308 0 : http_stream<AsyncStream>::
309 : on_read_body(
310 : system::error_code ec,
311 : std::size_t bytes_transferred)
312 : {
313 0 : if(ec.failed())
314 0 : return do_fail("http_stream::on_read_body", ec);
315 :
316 0 : LOG_TRC(this->sect_)(
317 : "{} http_stream::on_read_body bytes={}",
318 : this->id(), bytes_transferred);
319 :
320 0 : BOOST_ASSERT(rp_.parser.is_complete());
321 :
322 0 : rp_.do_finish();
323 : }
324 :
325 : // called after obtaining a route result
326 : template<class AsyncStream>
327 : void
328 0 : http_stream<AsyncStream>::
329 : do_respond(
330 : http::route_result rv)
331 : {
332 0 : BOOST_ASSERT(rv != http::route::next_route);
333 :
334 0 : if(rv == http::route::close)
335 : {
336 0 : return do_close();
337 : }
338 :
339 0 : if(rv == http::route::complete)
340 : {
341 : // VFALCO what if the connection was closed or keep-alive=false?
342 : // handler sent the response?
343 0 : BOOST_ASSERT(rp_.serializer.is_done());
344 0 : return on_write(system::error_code(), 0);
345 : }
346 :
347 0 : if(rv == http::route::suspend)
348 : {
349 : // didn't call suspend()?
350 0 : if(! pwg_)
351 0 : detail::throw_logic_error();
352 0 : if(rp_.parser.is_body_set())
353 0 : return do_read_body();
354 0 : return;
355 : }
356 :
357 0 : if(rv == http::route::next)
358 : {
359 : // unhandled request
360 0 : auto const status = http::status::not_found;
361 0 : rp_.status(status);
362 0 : rp_.set_body(http::to_string(status));
363 : }
364 0 : else if(rv != http::route::send)
365 : {
366 : // error message of last resort
367 0 : BOOST_ASSERT(rv.failed());
368 0 : BOOST_ASSERT(! http::is_route_result(rv));
369 0 : rp_.status(http::status::internal_server_error);
370 0 : std::string s;
371 0 : format_to(s, "An internal server error occurred: {}", rv.message());
372 0 : rp_.res.set_keep_alive(false); // VFALCO?
373 0 : rp_.set_body(s);
374 0 : }
375 :
376 0 : do_write();
377 : }
378 :
379 : // begin writing the response
380 : template<class AsyncStream>
381 : void
382 0 : http_stream<AsyncStream>::
383 : do_write()
384 : {
385 0 : BOOST_ASSERT(! rp_.serializer.is_done());
386 0 : beast2::async_write(stream_, rp_.serializer,
387 0 : call_mf(&http_stream::on_write, this));
388 0 : }
389 :
390 : // called when the write operation completes
391 : template<class AsyncStream>
392 : void
393 0 : http_stream<AsyncStream>::
394 : on_write(
395 : system::error_code const& ec,
396 : std::size_t bytes_transferred)
397 : {
398 : (void)bytes_transferred;
399 :
400 0 : if(ec.failed())
401 0 : return do_fail("http_stream::on_write", ec);
402 :
403 0 : BOOST_ASSERT(rp_.serializer.is_done());
404 :
405 0 : LOG_TRC(this->sect_)(
406 : "{} http_stream::on_write bytes={}",
407 : this->id(), bytes_transferred);
408 :
409 0 : if(rp_.res.keep_alive())
410 0 : return do_read();
411 :
412 0 : do_close();
413 : }
414 :
415 : template<class AsyncStream>
416 : auto
417 0 : http_stream<AsyncStream>::
418 : do_suspend() ->
419 : http::resumer
420 : {
421 0 : BOOST_ASSERT(stream_.get_executor().running_in_this_thread());
422 :
423 : // can't call twice
424 0 : BOOST_ASSERT(! pwg_);
425 0 : pwg_.reset(new work_guard(stream_.get_executor()));
426 :
427 : // VFALCO cancel timer
428 :
429 0 : return http::resumer(*this);
430 : }
431 :
432 : // called by resume(rv)
433 : template<class AsyncStream>
434 : void
435 0 : http_stream<AsyncStream>::
436 : do_resume(
437 : http::route_result const& rv)
438 : {
439 0 : asio::dispatch(
440 0 : stream_.get_executor(),
441 0 : [this, rv]
442 : {
443 0 : BOOST_ASSERT(pwg_.get() != nullptr);
444 0 : pwg_.reset();
445 :
446 0 : do_dispatch(rv);
447 : });
448 0 : }
449 :
450 : // called by resume(ep)
451 : template<class AsyncStream>
452 : void
453 0 : http_stream<AsyncStream>::
454 : do_resume(
455 : std::exception_ptr ep)
456 : {
457 0 : asio::dispatch(
458 0 : stream_.get_executor(),
459 0 : [this, ep]
460 : {
461 0 : BOOST_ASSERT(pwg_.get() != nullptr);
462 0 : pwg_.reset();
463 :
464 0 : rp_.status(http::status::internal_server_error);
465 : try
466 : {
467 0 : std::rethrow_exception(ep);
468 : }
469 0 : catch(std::exception const& e)
470 : {
471 0 : std::string s;
472 0 : format_to(s, "An internal server error occurred: {}", e.what());
473 0 : rp_.set_body(s);
474 0 : }
475 0 : catch(...)
476 : {
477 0 : rp_.set_body("An internal server error occurred");
478 : }
479 0 : rp_.res.set_keep_alive(false);
480 0 : do_write();
481 : });
482 0 : }
483 :
484 : // called when a non-recoverable error occurs
485 : template<class AsyncStream>
486 : void
487 0 : http_stream<AsyncStream>::
488 : do_fail(
489 : core::string_view s, system::error_code const& ec)
490 : {
491 0 : LOG_TRC(this->sect_)("{}: {}", s, ec.message());
492 :
493 : // tidy up lingering objects
494 0 : rp_.parser.reset();
495 0 : rp_.serializer.reset();
496 :
497 0 : close_(ec);
498 0 : }
499 :
500 : // end the session
501 : template<class AsyncStream>
502 : void
503 0 : http_stream<AsyncStream>::
504 : do_close()
505 : {
506 0 : clear();
507 0 : close_({});
508 0 : }
509 :
510 : // clear everything, releasing transient objects
511 : template<class AsyncStream>
512 : void
513 0 : http_stream<AsyncStream>::
514 : clear() noexcept
515 : {
516 0 : rp_.parser.reset();
517 0 : rp_.serializer.reset();
518 0 : rp_.res.clear();
519 0 : }
520 :
521 : } // beast2
522 : } // boost
523 :
524 : #endif
|