1 /*
2 Copyright (c) 2023-2024 Andrea Fontana
3 
4 Permission is hereby granted, free of charge, to any person
5 obtaining a copy of this software and associated documentation
6 files (the "Software"), to deal in the Software without
7 restriction, including without limitation the rights to use,
8 copy, modify, merge, publish, distribute, sublicense, and/or sell
9 copies of the Software, and to permit persons to whom the
10 Software is furnished to do so, subject to the following
11 conditions:
12 
13 The above copyright notice and this permission notice shall be
14 included in all copies or substantial portions of the Software.
15 
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
18 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
21 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23 OTHER DEALINGS IN THE SOFTWARE.
24 */
25 
26 module serverino.communicator;
27 
28 import serverino.common;
29 import serverino.databuffer;
30 import serverino.daemon : WorkerInfo;
31 import serverino.config : DaemonConfigPtr;
32 import std.socket;
33 
34 
35 import std.ascii : toLower;
36 import std.string: join;
37 import std.algorithm : strip;
38 import std.conv : text, to;
39 import std.format : format;
40 import std.experimental.logger : log, info, warning;
41 
42 extern(C) long syscall(long number, ...);
43 
44 /*
45  * The `ProtoRequest` class is a draft of a HTTP request.
46  * Full request will be parsed by the assigned worker.
47  * This is a linked list, so it can be used to store multiple requests.
48  */
49 package class ProtoRequest
50 {
51    enum Connection
52    {
53       Unknown = "unknown",
54       KeepAlive = "keep-alive",
55       Close = "close"
56    }
57 
58    enum HttpVersion
59    {
60       Unknown = "unknown",
61       HTTP_10 = "HTTP/1.0",
62       HTTP_11 = "HTTP/1.1"
63    }
64 
65    override string toString() const
66    {
67       string s;
68       s ~= text("VALID: ", isValid, "\n");
69       s ~= text("VER: ", httpVersion, "\n");
70       s ~= text("METHOD: ", method, "\n");
71       s ~= text("PATH: ", path, "\n");
72       s ~= text("BODY: ", contentLength, "\n");
73       s ~= text("HEADERS:", "\n");
74       s ~= (data[uint.sizeof..headersLength]);
75       s ~= "\n";
76 
77       return s;
78    }
79 
80 
81    bool     isValid = false;     // First checks on incoming data can invalidate request
82 
83    bool     headersDone = false; // Headers are read
84    bool     expect100 = false;   // Should we send 100-continue?
85 
86    size_t   contentLength = 0;   // Content length
87    size_t   headersLength;
88 
89    char[]   method;              // HTTP method
90    char[]   path;                // Request path
91 
92    char[]   data;                // Request data
93 
94    ProtoRequest next = null;     // Next request in the queue
95 
96    Connection  connection = Connection.Unknown;
97    HttpVersion httpVersion = HttpVersion.Unknown;
98 }
99 
100 /*
101  * The `Communicator` class receives and sends data to the client.
102  * It is also responsible for the first raw parsing of the incoming data.
103  * It communicates thru a unix socket with the assigned worker to process the request..
104 */
105 package class Communicator
106 {
107    enum State
108    {
109       READY = 0,        // Waiting to be paired with a client
110       PAIRED,           // Paired with a client
111       READING_HEADERS,  // Reading headers from client
112       READING_BODY,     // Reading body from client
113       KEEP_ALIVE        // Keep alive if the client wants to
114    }
115 
116    DaemonConfigPtr config;
117 
118    this(DaemonConfigPtr config) {
119 
120       // Every new instance is added to the list of instances
121       id = instances.length;
122       instances ~= this;
123       dead.insertBack(id);
124       this.config = config;
125 
126    }
127 
128    void setResponseLength(size_t s) { responseLength = s; responseSent = 0; }
129    bool completed() { return responseLength == responseSent; }
130 
131    // Unset the client socket and move the communicator to the ready state
132    pragma(inline, true)
133    void unsetClientSocket()
134    {
135       status = State.READY;
136 
137       if (this.clientSkt !is null)
138       {
139          alive.remove(id);
140          dead.insertBack(id);
141          this.clientSkt = null;
142       }
143    }
144 
145    // Assign a client socket to the communicator and move it to the paired state
146    pragma(inline, true)
147    void setClientSocket(Socket s, ulong requestId)
148    {
149       this.requestId = format("%06s", requestId);
150 
151       status = State.PAIRED;
152 
153       if (s !is null && this.clientSkt is null)
154       {
155          dead.remove(id);
156          alive.insertFront(id);
157          s.blocking = false;
158       }
159 
160       this.clientSkt = s;
161    }
162 
163    // Reset the communicator to the initial state and clear the requests queue
164    void reset()
165    {
166       if (clientSkt !is null)
167       {
168          clientSkt.shutdown(SocketShutdown.BOTH);
169          clientSkt.close();
170          unsetClientSocket();
171       }
172 
173       status = State.READY;
174       responseLength = 0;
175       responseSent = 0;
176       leftover.length = 0;
177 
178       // Clear requests queue
179       while(requestToProcess !is null)
180       {
181          auto tmp = requestToProcess;
182          requestToProcess = requestToProcess.next;
183          tmp.next = null;
184       }
185 
186       requestToProcess = null;
187 
188       requestDataReceived = false;
189       lastRecv = CoarseTime.zero;
190       lastRequest = CoarseTime.zero;
191 
192       hasQueuedRequests = false;
193    }
194 
195    // If this communicator has a worker assigned, unset it
196    pragma(inline, true)
197    void unsetWorker()
198    {
199 
200       if (this.worker !is null)
201       {
202          this.worker.communicator = null;
203          this.worker.setStatus(WorkerInfo.State.IDLING);
204          this.worker = null;
205       }
206 
207       responseLength = 0;
208       responseSent = 0;
209    }
210 
211    // Assign a worker to the communicator
212    pragma(inline, true)
213    void setWorker(ref WorkerInfo worker)
214    {
215       this.worker = worker;
216       worker.communicator = this;
217 
218       worker.setStatus(WorkerInfo.State.PROCESSING);
219       auto current = requestToProcess;
220       uint len = cast(uint)(current.data.length) - cast(uint)uint.sizeof;
221       current.data[0..uint.sizeof] = (cast(char*)&len)[0..uint.sizeof];
222 
223       isKeepAlive = current.connection == ProtoRequest.Connection.KeepAlive;
224       worker.unixSocket.send(current.data);
225 
226       requestToProcess = requestToProcess.next;
227       lastRequest = CoarseTime.currTime;
228    }
229 
230    // Write the buffered data to the client socket
231    void write()
232    {
233       auto maxToSend = bufferSent + 32*1024;
234       if (maxToSend > sendBuffer.length) maxToSend = sendBuffer.length;
235 
236       if (maxToSend == 0)
237          return;
238 
239       immutable sent = clientSkt.send(sendBuffer.array[bufferSent..maxToSend]);
240 
241       if (sent == Socket.ERROR)
242       {
243          if(!wouldHaveBlocked)
244          {
245             log("Socket Error");
246             reset();
247          }
248       }
249       else
250       {
251          bufferSent += sent;
252          responseSent += sent;
253          if(bufferSent == sendBuffer.length)
254          {
255             bufferSent = 0;
256             sendBuffer.clear();
257          }
258       }
259 
260       // If the response is completed, unset the worker
261       // and if the client is not keep alive, reset the communicator
262       if (completed())
263       {
264          unsetWorker();
265 
266          if (!isKeepAlive)
267             reset();
268       }
269    }
270 
271    // Try to write the data to the client socket, it buffers the data if the socket is not ready
272    void write(char[] data)
273    {
274       if (clientSkt is null)
275       {
276          reset();
277          return;
278       }
279 
280       if (sendBuffer.length == 0)
281       {
282          auto sent = clientSkt.send(data);
283 
284          if (sent == Socket.ERROR)
285          {
286             if(!wouldHaveBlocked)
287             {
288                log("Socket error on write. ", lastSocketError);
289                reset();
290                return;
291             }
292             else sendBuffer.append(data);
293          }
294          else
295          {
296             responseSent += sent;
297             if (sent < data.length) sendBuffer.append(data[sent..data.length]);
298          }
299 
300          // If the response is completed, unset the worker
301          // and if the client is not keep alive, reset the communicator
302          if (completed())
303          {
304             unsetWorker();
305 
306             if (!isKeepAlive)
307                reset();
308          }
309       }
310       else
311       {
312          sendBuffer.append(data);
313          write();
314       }
315    }
316 
317    // Read the data from the client socket and parse the incoming data
318    void read(bool fromBuffer = false)
319    {
320       import std.string: indexOf;
321 
322       // Create a new request if the current one is completed
323       if (status == State.PAIRED || status == State.KEEP_ALIVE)
324       {
325          // Queue a new request
326          if (requestToProcess is null) requestToProcess = new ProtoRequest();
327          else
328          {
329             ProtoRequest tmp = requestToProcess;
330             while(tmp.next !is null)
331                tmp = tmp.next;
332 
333             tmp.next = new ProtoRequest();
334             requestToProcess = tmp.next;
335          }
336 
337 	      status = State.READING_HEADERS;
338       }
339 
340       ProtoRequest request = requestToProcess;
341 
342       // First 2 bytes are the length of the request
343       // They will be overwritten by the actual length of the request
344       // and sent to the worker through the unix socket
345       uint len = 0;
346       if(request.data.length == 0)
347       {
348 	      request.data ~= (cast(char*)(&len))[0..uint.sizeof];
349       }
350 
351       char[32*1024] buffer;
352       ptrdiff_t bytesRead = 0;
353 
354       // Read the data from the client socket if it's not buffered
355       // Set the started flag to true if the first data is read to check for timeouts
356       if (!fromBuffer)
357       {
358          bytesRead = clientSkt.receive(buffer);
359 
360          if (bytesRead < 0)
361          {
362             status = State.READY;
363             log("Socket error on read. ", lastSocketError);
364             reset();
365             return;
366          }
367 
368          if (bytesRead == 0)
369          {
370             // Connection closed.
371             status = State.READY;
372             reset();
373             return;
374          }
375          else if (requestDataReceived == false) requestDataReceived = true;
376 
377       }
378       else
379       {
380          bytesRead = 0;
381          requestDataReceived = true;
382       }
383 
384       auto bufferRead = buffer[0..bytesRead];
385 
386       // If there's leftover data from the previous read, append it to the current buffer
387       if (leftover.length)
388       {
389          bufferRead = leftover ~ bufferRead;
390          leftover.length = 0;
391       }
392 
393       bool tryParse = true;
394       while(tryParse)
395       {
396          bool enqueueNewRequest = false;
397          tryParse = false;
398 
399          if (status == State.READING_HEADERS)
400          {
401             // We are still waiting for the headers to be completed
402             if (!request.headersDone)
403             {
404                auto headersEnd = bufferRead.indexOf("\r\n\r\n");
405 
406                // Are the headers completed?
407                if (headersEnd >= 0)
408                {
409                   if (headersEnd > config.maxRequestSize)
410                   {
411                      clientSkt.send("HTTP/1.0 413 Request Entity Too Large\r\n");
412                      reset();
413                      return;
414                   }
415 
416                   import std.algorithm : splitter, map, joiner;
417 
418                   // Extra data after the headers is stored in the leftover buffer
419                   request.data ~= bufferRead[0..headersEnd];
420                   leftover = bufferRead[headersEnd+4..$];
421 
422                   // The headers are completed
423                   bufferRead.length = 0;
424                   request.headersDone = true;
425                   request.isValid = true;
426 
427                   auto firstLine = request.data.indexOf("\r\n");
428 
429                   // HACK: A single line (http 1.0?) request.
430                   if (firstLine < 0)
431                   {
432                      firstLine = request.data.length;
433                      request.data ~= "\r\n";
434                   }
435 
436                   if (firstLine < 18)
437                   {
438                      request.isValid = false;
439                      clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
440                      debug warning("Bad Request. Request line too short.");
441                      reset();
442                      return;
443                   }
444 
445                   auto fields = request.data[uint.sizeof..firstLine].splitter(' ');
446                   size_t popped = 0;
447 
448                   if (!fields.empty)
449                   {
450                      request.method = fields.front;
451                      fields.popFront;
452                      popped++;
453                   }
454 
455                   if (!fields.empty)
456                   {
457                      request.path = fields.front;
458                      fields.popFront;
459                      popped++;
460                   }
461 
462                   if (!fields.empty)
463                   {
464                      request.httpVersion = cast(ProtoRequest.HttpVersion)fields.front;
465                      fields.popFront;
466                      popped++;
467                   }
468 
469                   // HTTP version must be 1.0 or 1.1
470                   if (request.httpVersion != ProtoRequest.HttpVersion.HTTP_10 && request.httpVersion != ProtoRequest.HttpVersion.HTTP_11)
471                   {
472                      request.isValid = false;
473                      clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
474                      debug warning("Bad Request. Http version unknown.");
475                      reset();
476                      return;
477                   }
478 
479                   if (popped != 3 || !fields.empty)
480                   {
481                      request.isValid = false;
482                      clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
483                      debug warning("Bad Request. Malformed request line.");
484                      reset();
485                      return;
486                   }
487 
488                   if (request.path[0] != '/')
489                   {
490                      request.isValid = false;
491                      clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
492                      debug warning("Bad Request. Absolute uri?");
493                      reset();
494                      return;
495                   }
496 
497                   // Parse headers for 100-continue, content-length and connection
498                   auto hdrs = request.data[firstLine+2..$]
499                   .splitter("\r\n")
500                   .map!((char[] row)
501                   {
502                      if (!request.isValid)
503                         return (char[]).init;
504 
505                      auto headerColon = row.indexOf(':');
506 
507                      if (headerColon < 0)
508                      {
509                         request.isValid = false;
510                         return (char[]).init;
511                      }
512 
513                      // Headers keys are case insensitive, so we lowercase them
514                      // We strip the leading and trailing spaces from both key and value
515                      char[] key = cast(char[])row[0..headerColon].strip!(x =>x==' ' || x=='\t');
516                      char[] value = cast(char[])row[headerColon+1..$].strip!(x =>x==' '|| x=='\t');
517 
518                      // Fast way to lowercase the key. Check if it is ASCII only.
519                      foreach(idx, ref k; key)
520                      {
521                         if (k > 0xF9)
522                         {
523                            request.isValid = false;
524                            return (char[]).init;
525                         }
526                         else if (k >= 'A' && k <= 'Z')
527                           k |= 32;
528                      }
529 
530                      foreach(idx, ref k; value)
531                      {
532                         if (k > 0xF9)
533                         {
534                            request.isValid = false;
535                            return (char[]).init;
536                         }
537                      }
538 
539                      if (key.length == 0 || value.length == 0)
540                      {
541                         request.isValid = false;
542                         return (char[]).init;
543                      }
544 
545                      // 100-continue
546                      if (key == "expect" && value.length == 12 && value[0..4] == "100-") request.expect100 = true;
547                      else if (key == "connection")
548                      {
549                         import std.uni: sicmp;
550 
551                         if (sicmp(value, "keep-alive") == 0) request.connection = ProtoRequest.Connection.KeepAlive;
552                         else if (sicmp(value, "close") == 0) request.connection = ProtoRequest.Connection.Close;
553                         else request.connection = ProtoRequest.connection.Unknown;
554                      }
555                      else
556                      try { if (key == "content-length") request.contentLength = value.to!size_t; }
557                      catch (Exception e) { request.isValid = false; return (char[]).init; }
558 
559                      return key ~ ":" ~ value;
560                   })
561                   .join("\r\n") ~ "\r\n\r\n";
562 
563 
564                   request.data.length = firstLine+2 + hdrs.length;
565 
566                   // If required by configuration, add the remote ip to the headers
567                   // It is disabled by default, as it is a slow operation and it is not always needed
568                   string ra = string.init;
569                   if(config.withRemoteIp)
570                   {
571                      ra = "x-remote-ip:" ~ clientSkt.remoteAddress().toAddrString() ~ "\r\n";
572                      request.data.length += ra.length;
573                      request.data[firstLine+2..firstLine+2+ra.length] = ra;
574                   }
575 
576                   request.data[firstLine+2+ra.length..$] = hdrs[0..$];
577 
578                   if (request.isValid == false)
579                   {
580                      clientSkt.send("HTTP/1.0 400 Bad Request\r\n");
581                      debug warning("Bad Request. Malformed request.");
582                      reset();
583                      return;
584                   }
585 
586                   // Keep alive is the default for HTTP/1.1, close for HTTP/1.0
587                   if (request.connection == ProtoRequest.Connection.Unknown)
588                   {
589                      if (request.httpVersion == ProtoRequest.HttpVersion.HTTP_11) request.connection = ProtoRequest.Connection.KeepAlive;
590                      else request.connection = ProtoRequest.Connection.Close;
591                   }
592 
593                   request.headersLength = request.data.length;
594 
595                   // If the request has a body, we need to read it
596                   if (request.contentLength != 0)
597                   {
598                      if (request.headersLength + request.contentLength  > config.maxRequestSize)
599                      {
600                         clientSkt.send("HTTP/1.0 413 Request Entity Too Large\r\n");
601                         reset();
602                         return;
603                      }
604 
605                      request.data.reserve(request.headersLength + request.contentLength);
606                      request.isValid = false;
607                      tryParse = true;
608 		               status = State.READING_BODY;
609 
610                      // If required, we send the 100-continue response now
611                      if (request.expect100)
612                         clientSkt.send(cast(char[])(request.httpVersion ~ " 100 continue\r\n\r\n"));
613                   }
614                   else
615                   {
616                      // No body, we can process the request
617                      requestDataReceived = false;
618                      hasQueuedRequests = leftover.length > 0;
619                      enqueueNewRequest = hasQueuedRequests;
620 
621                      if (request.connection == ProtoRequest.Connection.KeepAlive) status = State.KEEP_ALIVE;
622                      else status = State.READY;
623 
624                   }
625                }
626                else leftover = bufferRead.dup;
627             }
628 
629          }
630          else if (status == State.READING_BODY)
631          {
632             // We are reading the body of the request
633             request.data ~= leftover;
634             request.data ~= bufferRead;
635 
636             leftover.length = 0;
637             bufferRead.length = 0;
638 
639             if (request.data.length >= request.headersLength + request.contentLength)
640             {
641                // We read the whole body, process the request
642                requestDataReceived = false;
643                leftover = request.data[request.headersLength + request.contentLength..$];
644                request.data = request.data[0..request.headersLength + request.contentLength];
645                request.isValid = true;
646 
647                hasQueuedRequests = leftover.length > 0;
648                enqueueNewRequest = hasQueuedRequests;
649 
650                if (request.connection == ProtoRequest.Connection.KeepAlive) status = State.KEEP_ALIVE;
651                else status = State.READY;
652 
653             }
654          }
655 
656          if (enqueueNewRequest)
657          {
658             // There's a (partial) new request in the buffer, we need to create a new request
659             request.next = new ProtoRequest();
660             request = request.next;
661             status = State.READING_HEADERS;
662 
663             len = 0;
664             request.data.reserve(1024*10);
665             request.data ~= (cast(char*)(&len))[0..uint.sizeof];
666 
667             bufferRead = leftover;
668             leftover.length = 0;
669 
670             // We try to parse the new request immediately
671             tryParse = true;
672          }
673       }
674 
675    }
676 
677    DataBuffer!char   sendBuffer;
678    size_t            bufferSent;
679    string            requestId;
680 
681    bool              hasQueuedRequests = false;
682    bool              requestDataReceived;
683    bool              isKeepAlive;
684    size_t            responseSent;
685    size_t            responseLength;
686    size_t            id;
687    Socket            clientSkt;
688 
689    ProtoRequest      requestToProcess;
690    WorkerInfo        worker;
691    char[]            leftover;
692 
693    CoarseTime          lastRecv    = CoarseTime.zero;
694    CoarseTime          lastRequest = CoarseTime.zero;
695 
696    static SimpleList          alive;
697    static SimpleList          dead;
698    Communicator.State    status;
699    static Communicator[] instances;
700 }