1 /*
2 Copyright (c) 2023-2024 Andrea Fontana
3
4 Permission is hereby granted, free of charge, to any person
5 obtaining a copy of this software and associated documentation
6 files (the "Software"), to deal in the Software without
7 restriction, including without limitation the rights to use,
8 copy, modify, merge, publish, distribute, sublicense, and/or sell
9 copies of the Software, and to permit persons to whom the
10 Software is furnished to do so, subject to the following
11 conditions:
12
13 The above copyright notice and this permission notice shall be
14 included in all copies or substantial portions of the Software.
15
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
18 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
21 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23 OTHER DEALINGS IN THE SOFTWARE.
24 */
25
26 module serverino.communicator;
27
28 import serverino.common;
29 import serverino.databuffer;
30 import serverino.daemon : WorkerInfo;
31 import serverino.config : DaemonConfigPtr;
32 import std.socket;
33
34
35 import std.ascii : toLower;
36 import std.string: join;
37 import std.algorithm : strip;
38 import std.conv : text, to;
39 import std.format : format;
40 import std.experimental.logger : log, info, warning;
41
42 extern(C) long syscall(long number, ...);
43
44 /*
45 * The `ProtoRequest` class is a draft of a HTTP request.
46 * Full request will be parsed by the assigned worker.
47 * This is a linked list, so it can be used to store multiple requests.
48 */
49 package class ProtoRequest
50 {
51 enum Connection
52 {
53 Unknown = "unknown",
54 KeepAlive = "keep-alive",
55 Close = "close"
56 }
57
58 enum HttpVersion
59 {
60 Unknown = "unknown",
61 HTTP_10 = "HTTP/1.0",
62 HTTP_11 = "HTTP/1.1"
63 }
64
65 override string toString() const
66 {
67 string s;
68 s ~= text("VALID: ", isValid, "\n");
69 s ~= text("VER: ", httpVersion, "\n");
70 s ~= text("METHOD: ", method, "\n");
71 s ~= text("PATH: ", path, "\n");
72 s ~= text("BODY: ", contentLength, "\n");
73 s ~= text("HEADERS:", "\n");
74 s ~= (data[uint.sizeof..headersLength]);
75 s ~= "\n";
76
77 return s;
78 }
79
80
81 bool isValid = false; // First checks on incoming data can invalidate request
82
83 bool headersDone = false; // Headers are read
84 bool expect100 = false; // Should we send 100-continue?
85
86 size_t contentLength = 0; // Content length
87 size_t headersLength;
88
89 char[] method; // HTTP method
90 char[] path; // Request path
91
92 char[] data; // Request data
93
94 ProtoRequest next = null; // Next request in the queue
95
96 Connection connection = Connection.Unknown;
97 HttpVersion httpVersion = HttpVersion.Unknown;
98 }
99
100 /*
101 * The `Communicator` class receives and sends data to the client.
102 * It is also responsible for the first raw parsing of the incoming data.
103 * It communicates thru a unix socket with the assigned worker to process the request..
104 */
105 package class Communicator
106 {
107 enum State
108 {
109 READY = 0, // Waiting to be paired with a client
110 PAIRED, // Paired with a client
111 READING_HEADERS, // Reading headers from client
112 READING_BODY, // Reading body from client
113 KEEP_ALIVE // Keep alive if the client wants to
114 }
115
116 DaemonConfigPtr config;
117
118 this(DaemonConfigPtr config) {
119
120 // Every new instance is added to the list of instances
121 id = instances.length;
122 instances ~= this;
123 dead.insertBack(id);
124 this.config = config;
125
126 }
127
128 void setResponseLength(size_t s) { responseLength = s; responseSent = 0; }
129 bool completed() { return responseLength == responseSent; }
130
131 // Unset the client socket and move the communicator to the ready state
132 pragma(inline, true)
133 void unsetClientSocket()
134 {
135 status = State.READY;
136
137 if (this.clientSkt !is null)
138 {
139 alive.remove(id);
140 dead.insertBack(id);
141 this.clientSkt = null;
142 }
143 }
144
145 // Assign a client socket to the communicator and move it to the paired state
146 pragma(inline, true)
147 void setClientSocket(Socket s, ulong requestId)
148 {
149 this.requestId = format("%06s", requestId);
150
151 status = State.PAIRED;
152
153 if (s !is null && this.clientSkt is null)
154 {
155 dead.remove(id);
156 alive.insertFront(id);
157 s.blocking = false;
158 }
159
160 this.clientSkt = s;
161 }
162
163 // Reset the communicator to the initial state and clear the requests queue
164 void reset()
165 {
166 if (clientSkt !is null)
167 {
168 clientSkt.shutdown(SocketShutdown.BOTH);
169 clientSkt.close();
170 unsetClientSocket();
171 }
172
173 status = State.READY;
174 responseLength = 0;
175 responseSent = 0;
176 leftover.length = 0;
177
178 // Clear requests queue
179 while(requestToProcess !is null)
180 {
181 auto tmp = requestToProcess;
182 requestToProcess = requestToProcess.next;
183 tmp.next = null;
184 }
185
186 requestToProcess = null;
187
188 requestDataReceived = false;
189 lastRecv = CoarseTime.zero;
190 lastRequest = CoarseTime.zero;
191
192 hasQueuedRequests = false;
193 }
194
195 // If this communicator has a worker assigned, unset it
196 pragma(inline, true)
197 void unsetWorker()
198 {
199
200 if (this.worker !is null)
201 {
202 this.worker.communicator = null;
203 this.worker.setStatus(WorkerInfo.State.IDLING);
204 this.worker = null;
205 }
206
207 responseLength = 0;
208 responseSent = 0;
209 }
210
211 // Assign a worker to the communicator
212 pragma(inline, true)
213 void setWorker(ref WorkerInfo worker)
214 {
215 this.worker = worker;
216 worker.communicator = this;
217
218 worker.setStatus(WorkerInfo.State.PROCESSING);
219 auto current = requestToProcess;
220 uint len = cast(uint)(current.data.length) - cast(uint)uint.sizeof;
221 current.data[0..uint.sizeof] = (cast(char*)&len)[0..uint.sizeof];
222
223 isKeepAlive = current.connection == ProtoRequest.Connection.KeepAlive;
224 worker.unixSocket.send(current.data);
225
226 requestToProcess = requestToProcess.next;
227 lastRequest = CoarseTime.currTime;
228 }
229
230 // Write the buffered data to the client socket
231 void write()
232 {
233 auto maxToSend = bufferSent + 32*1024;
234 if (maxToSend > sendBuffer.length) maxToSend = sendBuffer.length;
235
236 if (maxToSend == 0)
237 return;
238
239 immutable sent = clientSkt.send(sendBuffer.array[bufferSent..maxToSend]);
240
241 if (sent == Socket.ERROR)
242 {
243 if(!wouldHaveBlocked)
244 {
245 log("Socket Error");
246 reset();
247 }
248 }
249 else
250 {
251 bufferSent += sent;
252 responseSent += sent;
253 if(bufferSent == sendBuffer.length)
254 {
255 bufferSent = 0;
256 sendBuffer.clear();
257 }
258 }
259
260 // If the response is completed, unset the worker
261 // and if the client is not keep alive, reset the communicator
262 if (completed())
263 {
264 unsetWorker();
265
266 if (!isKeepAlive)
267 reset();
268 }
269 }
270
271 // Try to write the data to the client socket, it buffers the data if the socket is not ready
272 void write(char[] data)
273 {
274 if (clientSkt is null)
275 {
276 reset();
277 return;
278 }
279
280 if (sendBuffer.length == 0)
281 {
282 auto sent = clientSkt.send(data);
283
284 if (sent == Socket.ERROR)
285 {
286 if(!wouldHaveBlocked)
287 {
288 log("Socket error on write. ", lastSocketError);
289 reset();
290 return;
291 }
292 else sendBuffer.append(data);
293 }
294 else
295 {
296 responseSent += sent;
297 if (sent < data.length) sendBuffer.append(data[sent..data.length]);
298 }
299
300 // If the response is completed, unset the worker
301 // and if the client is not keep alive, reset the communicator
302 if (completed())
303 {
304 unsetWorker();
305
306 if (!isKeepAlive)
307 reset();
308 }
309 }
310 else
311 {
312 sendBuffer.append(data);
313 write();
314 }
315 }
316
317 // Read the data from the client socket and parse the incoming data
318 void read(bool fromBuffer = false)
319 {
320 import std.string: indexOf;
321
322 // Create a new request if the current one is completed
323 if (status == State.PAIRED || status == State.KEEP_ALIVE)
324 {
325 // Queue a new request
326 if (requestToProcess is null) requestToProcess = new ProtoRequest();
327 else
328 {
329 ProtoRequest tmp = requestToProcess;
330 while(tmp.next !is null)
331 tmp = tmp.next;
332
333 tmp.next = new ProtoRequest();
334 requestToProcess = tmp.next;
335 }
336
337 status = State.READING_HEADERS;
338 }
339
340 ProtoRequest request = requestToProcess;
341
342 // First 2 bytes are the length of the request
343 // They will be overwritten by the actual length of the request
344 // and sent to the worker through the unix socket
345 uint len = 0;
346 if(request.data.length == 0)
347 {
348 request.data ~= (cast(char*)(&len))[0..uint.sizeof];
349 }
350
351 char[32*1024] buffer;
352 ptrdiff_t bytesRead = 0;
353
354 // Read the data from the client socket if it's not buffered
355 // Set the started flag to true if the first data is read to check for timeouts
356 if (!fromBuffer)
357 {
358 bytesRead = clientSkt.receive(buffer);
359
360 if (bytesRead < 0)
361 {
362 status = State.READY;
363 log("Socket error on read. ", lastSocketError);
364 reset();
365 return;
366 }
367
368 if (bytesRead == 0)
369 {
370 // Connection closed.
371 status = State.READY;
372 reset();
373 return;
374 }
375 else if (requestDataReceived == false) requestDataReceived = true;
376
377 }
378 else
379 {
380 bytesRead = 0;
381 requestDataReceived = true;
382 }
383
384 auto bufferRead = buffer[0..bytesRead];
385
386 // If there's leftover data from the previous read, append it to the current buffer
387 if (leftover.length)
388 {
389 bufferRead = leftover ~ bufferRead;
390 leftover.length = 0;
391 }
392
393 bool tryParse = true;
394 while(tryParse)
395 {
396 bool enqueueNewRequest = false;
397 tryParse = false;
398
399 if (status == State.READING_HEADERS)
400 {
401 // We are still waiting for the headers to be completed
402 if (!request.headersDone)
403 {
404 auto headersEnd = bufferRead.indexOf("\r\n\r\n");
405
406 // Are the headers completed?
407 if (headersEnd >= 0)
408 {
409 if (headersEnd > config.maxRequestSize)
410 {
411 clientSkt.send("HTTP/1.0 413 Request Entity Too Large\r\n");
412 reset();
413 return;
414 }
415
416 import std.algorithm : splitter, map, joiner;
417
418 // Extra data after the headers is stored in the leftover buffer
419 request.data ~= bufferRead[0..headersEnd];
420 leftover = bufferRead[headersEnd+4..$];
421
422 // The headers are completed
423 bufferRead.length = 0;
424 request.headersDone = true;
425 request.isValid = true;
426
427 auto firstLine = request.data.indexOf("\r\n");
428
429 // HACK: A single line (http 1.0?) request.
430 if (firstLine < 0)
431 {
432 firstLine = request.data.length;
433 request.data ~= "\r\n";
434 }
435
436 if (firstLine < 18)
437 {
438 request.isValid = false;
439 clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
440 debug warning("Bad Request. Request line too short.");
441 reset();
442 return;
443 }
444
445 auto fields = request.data[uint.sizeof..firstLine].splitter(' ');
446 size_t popped = 0;
447
448 if (!fields.empty)
449 {
450 request.method = fields.front;
451 fields.popFront;
452 popped++;
453 }
454
455 if (!fields.empty)
456 {
457 request.path = fields.front;
458 fields.popFront;
459 popped++;
460 }
461
462 if (!fields.empty)
463 {
464 request.httpVersion = cast(ProtoRequest.HttpVersion)fields.front;
465 fields.popFront;
466 popped++;
467 }
468
469 // HTTP version must be 1.0 or 1.1
470 if (request.httpVersion != ProtoRequest.HttpVersion.HTTP_10 && request.httpVersion != ProtoRequest.HttpVersion.HTTP_11)
471 {
472 request.isValid = false;
473 clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
474 debug warning("Bad Request. Http version unknown.");
475 reset();
476 return;
477 }
478
479 if (popped != 3 || !fields.empty)
480 {
481 request.isValid = false;
482 clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
483 debug warning("Bad Request. Malformed request line.");
484 reset();
485 return;
486 }
487
488 if (request.path[0] != '/')
489 {
490 request.isValid = false;
491 clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
492 debug warning("Bad Request. Absolute uri?");
493 reset();
494 return;
495 }
496
497 // Parse headers for 100-continue, content-length and connection
498 auto hdrs = request.data[firstLine+2..$]
499 .splitter("\r\n")
500 .map!((char[] row)
501 {
502 if (!request.isValid)
503 return (char[]).init;
504
505 auto headerColon = row.indexOf(':');
506
507 if (headerColon < 0)
508 {
509 request.isValid = false;
510 return (char[]).init;
511 }
512
513 // Headers keys are case insensitive, so we lowercase them
514 // We strip the leading and trailing spaces from both key and value
515 char[] key = cast(char[])row[0..headerColon].strip!(x =>x==' ' || x=='\t');
516 char[] value = cast(char[])row[headerColon+1..$].strip!(x =>x==' '|| x=='\t');
517
518 // Fast way to lowercase the key. Check if it is ASCII only.
519 foreach(idx, ref k; key)
520 {
521 if (k > 0xF9)
522 {
523 request.isValid = false;
524 return (char[]).init;
525 }
526 else if (k >= 'A' && k <= 'Z')
527 k |= 32;
528 }
529
530 foreach(idx, ref k; value)
531 {
532 if (k > 0xF9)
533 {
534 request.isValid = false;
535 return (char[]).init;
536 }
537 }
538
539 if (key.length == 0 || value.length == 0)
540 {
541 request.isValid = false;
542 return (char[]).init;
543 }
544
545 // 100-continue
546 if (key == "expect" && value.length == 12 && value[0..4] == "100-") request.expect100 = true;
547 else if (key == "connection")
548 {
549 import std.uni: sicmp;
550
551 if (sicmp(value, "keep-alive") == 0) request.connection = ProtoRequest.Connection.KeepAlive;
552 else if (sicmp(value, "close") == 0) request.connection = ProtoRequest.Connection.Close;
553 else request.connection = ProtoRequest.connection.Unknown;
554 }
555 else
556 try { if (key == "content-length") request.contentLength = value.to!size_t; }
557 catch (Exception e) { request.isValid = false; return (char[]).init; }
558
559 return key ~ ":" ~ value;
560 })
561 .join("\r\n") ~ "\r\n\r\n";
562
563
564 request.data.length = firstLine+2 + hdrs.length;
565
566 // If required by configuration, add the remote ip to the headers
567 // It is disabled by default, as it is a slow operation and it is not always needed
568 string ra = string.init;
569 if(config.withRemoteIp)
570 {
571 ra = "x-remote-ip:" ~ clientSkt.remoteAddress().toAddrString() ~ "\r\n";
572 request.data.length += ra.length;
573 request.data[firstLine+2..firstLine+2+ra.length] = ra;
574 }
575
576 request.data[firstLine+2+ra.length..$] = hdrs[0..$];
577
578 if (request.isValid == false)
579 {
580 clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
581 debug warning("Bad Request. Malformed request.");
582 reset();
583 return;
584 }
585
586 // Keep alive is the default for HTTP/1.1, close for HTTP/1.0
587 if (request.connection == ProtoRequest.Connection.Unknown)
588 {
589 if (request.httpVersion == ProtoRequest.HttpVersion.HTTP_11) request.connection = ProtoRequest.Connection.KeepAlive;
590 else request.connection = ProtoRequest.Connection.Close;
591 }
592
593 request.headersLength = request.data.length;
594
595 // If the request has a body, we need to read it
596 if (request.contentLength != 0)
597 {
598 if (request.headersLength + request.contentLength > config.maxRequestSize)
599 {
600 clientSkt.send("HTTP/1.0 413 Request Entity Too Large\r\n");
601 reset();
602 return;
603 }
604
605 request.data.reserve(request.headersLength + request.contentLength);
606 request.isValid = false;
607 tryParse = true;
608 status = State.READING_BODY;
609
610 // If required, we send the 100-continue response now
611 if (request.expect100)
612 clientSkt.send(cast(char[])(request.httpVersion ~ " 100 continue\r\n\r\n"));
613 }
614 else
615 {
616 // No body, we can process the request
617 requestDataReceived = false;
618 hasQueuedRequests = leftover.length > 0;
619 enqueueNewRequest = hasQueuedRequests;
620
621 if (request.connection == ProtoRequest.Connection.KeepAlive) status = State.KEEP_ALIVE;
622 else status = State.READY;
623
624 }
625 }
626 else leftover = bufferRead.dup;
627 }
628
629 }
630 else if (status == State.READING_BODY)
631 {
632 // We are reading the body of the request
633 request.data ~= leftover;
634 request.data ~= bufferRead;
635
636 leftover.length = 0;
637 bufferRead.length = 0;
638
639 if (request.data.length >= request.headersLength + request.contentLength)
640 {
641 // We read the whole body, process the request
642 requestDataReceived = false;
643 leftover = request.data[request.headersLength + request.contentLength..$];
644 request.data = request.data[0..request.headersLength + request.contentLength];
645 request.isValid = true;
646
647 hasQueuedRequests = leftover.length > 0;
648 enqueueNewRequest = hasQueuedRequests;
649
650 if (request.connection == ProtoRequest.Connection.KeepAlive) status = State.KEEP_ALIVE;
651 else status = State.READY;
652
653 }
654 }
655
656 if (enqueueNewRequest)
657 {
658 // There's a (partial) new request in the buffer, we need to create a new request
659 request.next = new ProtoRequest();
660 request = request.next;
661 status = State.READING_HEADERS;
662
663 len = 0;
664 request.data.reserve(1024*10);
665 request.data ~= (cast(char*)(&len))[0..uint.sizeof];
666
667 bufferRead = leftover;
668 leftover.length = 0;
669
670 // We try to parse the new request immediately
671 tryParse = true;
672 }
673 }
674
675 }
676
677 DataBuffer!char sendBuffer;
678 size_t bufferSent;
679 string requestId;
680
681 bool hasQueuedRequests = false;
682 bool requestDataReceived;
683 bool isKeepAlive;
684 size_t responseSent;
685 size_t responseLength;
686 size_t id;
687 Socket clientSkt;
688
689 ProtoRequest requestToProcess;
690 WorkerInfo worker;
691 char[] leftover;
692
693 CoarseTime lastRecv = CoarseTime.zero;
694 CoarseTime lastRequest = CoarseTime.zero;
695
696 static SimpleList alive;
697 static SimpleList dead;
698 Communicator.State status;
699 static Communicator[] instances;
700 }