Skip to content

Commit e20a432

Browse files
authored
fasthttp: multi threading fix reads in the socket in linux (#26478)
1 parent 2a3daec commit e20a432

6 files changed

Lines changed: 82 additions & 35 deletions

File tree

‎vlib/fasthttp/fasthttp.v‎

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ pub:
7171
// ServerConfig bundles the parameters needed to start a fasthttp server.
7272
pub struct ServerConfig {
7373
pub:
74-
port int = 3000
75-
max_request_buffer_size int = 8192
74+
family net.AddrFamily = .ip6
75+
port int = 3000
76+
max_request_buffer_size int = 8192
7677
handler fn (HttpRequest) !HttpResponse @[required]
7778
user_data voidptr
7879
}

‎vlib/fasthttp/fasthttp_darwin.v‎

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,23 @@ mut:
5555

5656
pub struct Server {
5757
pub mut:
58-
port int
59-
socket_fd int
60-
poll_fd int // kqueue fd
61-
user_data voidptr
62-
request_handler fn (HttpRequest) !HttpResponse @[required]
58+
family net.AddrFamily = .ip6
59+
port int
60+
max_request_buffer_size int = 8192
61+
socket_fd int
62+
poll_fd int // kqueue fd
63+
user_data voidptr
64+
request_handler fn (HttpRequest) !HttpResponse @[required]
6365
}
6466

6567
// new_server creates and initializes a new Server instance.
6668
pub fn new_server(config ServerConfig) !&Server {
6769
mut server := &Server{
68-
port: config.port
69-
user_data: config.user_data
70-
request_handler: config.handler
70+
family: config.family
71+
port: config.port
72+
max_request_buffer_size: config.max_request_buffer_size
73+
user_data: config.user_data
74+
request_handler: config.handler
7175
}
7276
return server
7377
}
@@ -272,7 +276,7 @@ fn accept_clients(kq int, listen_fd int) {
272276

273277
// run starts the server and enters the main event loop (Kqueue version).
274278
pub fn (mut s Server) run() ! {
275-
s.socket_fd = C.socket(net.AddrFamily.ip, net.SocketType.tcp, 0)
279+
s.socket_fd = C.socket(s.family, net.SocketType.tcp, 0)
276280
if s.socket_fd < 0 {
277281
C.perror(c'socket')
278282
return error('socket creation failed')
@@ -304,7 +308,7 @@ pub fn (mut s Server) run() ! {
304308
add_event(s.poll_fd, u64(s.socket_fd), i16(C.EVFILT_READ), u16(C.EV_ADD | C.EV_ENABLE | C.EV_CLEAR),
305309
unsafe { nil })
306310

307-
println('listening on http://localhost:${s.port}/')
311+
println('listening on http://0.0.0.0:${s.port}/')
308312

309313
mut events := [kqueue_max_events]C.kevent{}
310314
for {

‎vlib/fasthttp/fasthttp_linux.v‎

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ struct C.epoll_event {
3434

3535
struct Server {
3636
pub:
37-
port int = 3000
38-
max_request_buffer_size int = 8192
37+
family net.AddrFamily = .ip6
38+
port int = 3000
39+
max_request_buffer_size int = 8192
3940
user_data voidptr
4041
mut:
4142
listen_fds []int = []int{len: max_thread_pool_size, cap: max_thread_pool_size}
@@ -50,6 +51,7 @@ pub fn new_server(config ServerConfig) !&Server {
5051
return error('max_request_buffer_size must be greater than 0')
5152
}
5253
mut server := &Server{
54+
family: config.family
5355
port: config.port
5456
max_request_buffer_size: config.max_request_buffer_size
5557
user_data: config.user_data
@@ -92,9 +94,9 @@ fn close_socket(fd int) bool {
9294
return true
9395
}
9496

95-
fn create_server_socket(port int) int {
97+
fn create_server_socket(server Server) int {
9698
// Create a socket with non-blocking mode
97-
server_fd := C.socket(net.AddrFamily.ip, net.SocketType.tcp, 0)
99+
server_fd := C.socket(server.family, net.SocketType.tcp, 0)
98100
if server_fd < 0 {
99101
eprintln(@LOCATION)
100102
C.perror(c'Socket creation failed')
@@ -118,7 +120,7 @@ fn create_server_socket(port int) int {
118120
return -1
119121
}
120122

121-
addr := net.new_ip(u16(port), [u8(0), 0, 0, 0]!)
123+
addr := net.new_ip(u16(server.port), [u8(0), 0, 0, 0]!)
122124
alen := addr.len()
123125
if C.bind(server_fd, voidptr(&addr), alen) < 0 {
124126
eprintln(@LOCATION)
@@ -225,20 +227,55 @@ fn process_events(mut server Server, epoll_fd int, listen_fd int) {
225227
continue
226228
}
227229
if events[i].events & u32(C.EPOLLIN) != 0 {
228-
bytes_read := C.recv(client_fd, unsafe { &request_buffer[0] }, server.max_request_buffer_size - 1,
229-
0)
230-
if bytes_read > 0 {
230+
// Read all available data from the socket
231+
mut total_bytes_read := 0
232+
mut readed_request_buffer := []u8{len: server.max_request_buffer_size, cap: server.max_request_buffer_size}
233+
234+
for {
235+
bytes_read := C.recv(client_fd, unsafe { &request_buffer[0] }, server.max_request_buffer_size - 1,
236+
0)
237+
if bytes_read < 0 {
238+
if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
239+
// No more data available right now
240+
break
241+
}
242+
// Error occurred
243+
eprintln('ERROR: recv() failed with errno=${C.errno}')
244+
break
245+
} else if bytes_read == 0 {
246+
// Connection closed by client
247+
break
248+
}
249+
250+
// Append the received data to the buffer
251+
if total_bytes_read + bytes_read > server.max_request_buffer_size {
252+
// Buffer size exceeded
253+
break
254+
}
255+
unsafe {
256+
readed_request_buffer.push_many(&request_buffer[0], bytes_read)
257+
}
258+
total_bytes_read += bytes_read
259+
260+
// Check if we've received the complete HTTP request (look for \r\n\r\n)
261+
if total_bytes_read >= 4 {
262+
if readed_request_buffer[total_bytes_read - 4] == `\r`
263+
&& readed_request_buffer[total_bytes_read - 3] == `\n`
264+
&& readed_request_buffer[total_bytes_read - 2] == `\r`
265+
&& readed_request_buffer[total_bytes_read - 1] == `\n` {
266+
break
267+
}
268+
}
269+
}
270+
271+
if total_bytes_read > 0 {
231272
// Check if request exceeds buffer size
232-
if bytes_read >= server.max_request_buffer_size - 1 {
273+
if total_bytes_read >= server.max_request_buffer_size - 1 {
233274
C.send(client_fd, status_413_response.data, status_413_response.len,
234275
C.MSG_NOSIGNAL)
235276
handle_client_closure(epoll_fd, client_fd)
236277
continue
237278
}
238-
mut readed_request_buffer := []u8{cap: bytes_read}
239-
unsafe {
240-
readed_request_buffer.push_many(&request_buffer[0], bytes_read)
241-
}
242279
mut decoded_http_request := decode_http_request(readed_request_buffer) or {
243280
eprintln('Error decoding request ${err}')
244281
C.send(client_fd, tiny_bad_request_response.data, tiny_bad_request_response.len,
@@ -340,10 +377,10 @@ fn process_events(mut server Server, epoll_fd int, listen_fd int) {
340377
C.close(fd)
341378
}
342379
// Leave the connection open; closure is driven by client FIN or errors
343-
} else if bytes_read == 0 {
380+
} else if total_bytes_read == 0 {
344381
// Normal client closure (FIN received)
345382
handle_client_closure(epoll_fd, client_fd)
346-
} else if bytes_read < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK {
383+
} else if total_bytes_read < 0 && C.errno != C.EAGAIN && C.errno != C.EWOULDBLOCK {
347384
// Unexpected recv error - send 444 No Response
348385
C.send(client_fd, status_444_response.data, status_444_response.len,
349386
C.MSG_NOSIGNAL)
@@ -361,7 +398,7 @@ pub fn (mut server Server) run() ! {
361398
return
362399
}
363400
for i := 0; i < max_thread_pool_size; i++ {
364-
server.listen_fds[i] = create_server_socket(server.port)
401+
server.listen_fds[i] = create_server_socket(server)
365402
if server.listen_fds[i] < 0 {
366403
return
367404
}
@@ -383,7 +420,7 @@ pub fn (mut server Server) run() ! {
383420
server.threads[i] = spawn process_events(mut server, server.epoll_fds[i], server.listen_fds[i])
384421
}
385422

386-
println('listening on http://localhost:${server.port}/')
423+
println('listening on http://0.0.0.0:${server.port}/')
387424
// Main thread waits for workers; accepts are handled in worker epoll loops
388425
for i in 0 .. max_thread_pool_size {
389426
server.threads[i].wait()

‎vlib/fasthttp/fasthttp_windows.v‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,18 @@ module fasthttp
22

33
struct Server {
44
pub:
5-
port int = 3000
5+
port int = 3000
6+
max_request_buffer_size int = 8192
67
mut:
78
request_handler fn (HttpRequest) !HttpResponse @[required]
89
}
910

1011
// new_server creates and initializes a new Server instance.
1112
pub fn new_server(config ServerConfig) !&Server {
1213
mut server := &Server{
13-
port: config.port
14-
request_handler: config.handler
14+
port: config.port
15+
max_request_buffer_size: config.max_request_buffer_size
16+
request_handler: config.handler
1517
}
1618

1719
return server

‎vlib/veb/veb.v‎

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ pub:
7777
port int = default_port
7878
show_startup_message bool = true
7979
timeout_in_seconds int = 30
80+
max_request_buffer_size int = 8192
8081
benchmark_page_generation bool // for the "page rendered in X ms"
8182
}
8283

‎vlib/veb/veb_d_new_veb.v‎

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@ pub fn run_new[A, X](mut global_app A, params RunParams) ! {
4141

4242
// Configure and run the fasthttp server
4343
mut server := fasthttp.new_server(fasthttp.ServerConfig{
44-
port: params.port
45-
handler: parallel_request_handler[A, X]
46-
user_data: voidptr(request_params)
44+
family: params.family
45+
port: params.port
46+
handler: parallel_request_handler[A, X]
47+
max_request_buffer_size: params.max_request_buffer_size
48+
user_data: voidptr(request_params)
4749
}) or {
4850
eprintln('Failed to create server: ${err}')
4951
return

0 commit comments

Comments
 (0)