1 /* 2 Copyright (c) 2023-2024 Andrea Fontana 3 4 Permission is hereby granted, free of charge, to any person 5 obtaining a copy of this software and associated documentation 6 files (the "Software"), to deal in the Software without 7 restriction, including without limitation the rights to use, 8 copy, modify, merge, publish, distribute, sublicense, and/or sell 9 copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following 11 conditions: 12 13 The above copyright notice and this permission notice shall be 14 included in all copies or substantial portions of the Software. 15 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 17 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 18 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 19 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 20 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 21 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 23 OTHER DEALINGS IN THE SOFTWARE. 24 */ 25 26 module serverino.communicator; 27 28 import serverino.common; 29 import serverino.databuffer; 30 import serverino.daemon : WorkerInfo; 31 import serverino.config : DaemonConfigPtr; 32 import std.socket; 33 34 35 import std.ascii : toLower; 36 import std.string: join; 37 import std.algorithm : strip; 38 import std.conv : text, to; 39 import std.format : format; 40 import std.experimental.logger : log, info, warning; 41 42 extern(C) long syscall(long number, ...); 43 44 /* 45 * The `ProtoRequest` class is a draft of a HTTP request. 46 * Full request will be parsed by the assigned worker. 47 * This is a linked list, so it can be used to store multiple requests. 48 */ 49 package class ProtoRequest 50 { 51 enum Connection 52 { 53 Unknown = "unknown", 54 KeepAlive = "keep-alive", 55 Close = "close" 56 } 57 58 enum HttpVersion 59 { 60 Unknown = "unknown", 61 HTTP_10 = "HTTP/1.0", 62 HTTP_11 = "HTTP/1.1" 63 } 64 65 override string toString() const 66 { 67 string s; 68 s ~= text("VALID: ", isValid, "\n"); 69 s ~= text("VER: ", httpVersion, "\n"); 70 s ~= text("METHOD: ", method, "\n"); 71 s ~= text("PATH: ", path, "\n"); 72 s ~= text("BODY: ", contentLength, "\n"); 73 s ~= text("HEADERS:", "\n"); 74 s ~= (data[uint.sizeof..headersLength]); 75 s ~= "\n"; 76 77 return s; 78 } 79 80 81 bool isValid = false; // First checks on incoming data can invalidate request 82 83 bool headersDone = false; // Headers are read 84 bool expect100 = false; // Should we send 100-continue? 85 86 size_t contentLength = 0; // Content length 87 size_t headersLength; 88 89 char[] method; // HTTP method 90 char[] path; // Request path 91 92 char[] data; // Request data 93 94 ProtoRequest next = null; // Next request in the queue 95 96 Connection connection = Connection.Unknown; 97 HttpVersion httpVersion = HttpVersion.Unknown; 98 } 99 100 /* 101 * The `Communicator` class receives and sends data to the client. 102 * It is also responsible for the first raw parsing of the incoming data. 103 * It communicates thru a unix socket with the assigned worker to process the request.. 104 */ 105 package class Communicator 106 { 107 enum State 108 { 109 READY = 0, // Waiting to be paired with a client 110 PAIRED, // Paired with a client 111 READING_HEADERS, // Reading headers from client 112 READING_BODY, // Reading body from client 113 KEEP_ALIVE // Keep alive if the client wants to 114 } 115 116 DaemonConfigPtr config; 117 118 this(DaemonConfigPtr config) { 119 120 // Every new instance is added to the list of instances 121 id = instances.length; 122 instances ~= this; 123 dead.insertBack(id); 124 this.config = config; 125 126 } 127 128 void setResponseLength(size_t s) { responseLength = s; responseSent = 0; } 129 bool completed() { return responseLength == responseSent; } 130 131 // Unset the client socket and move the communicator to the ready state 132 pragma(inline, true) 133 void unsetClientSocket() 134 { 135 status = State.READY; 136 137 if (this.clientSkt !is null) 138 { 139 alive.remove(id); 140 dead.insertBack(id); 141 this.clientSkt = null; 142 } 143 } 144 145 // Assign a client socket to the communicator and move it to the paired state 146 pragma(inline, true) 147 void setClientSocket(Socket s, ulong requestId) 148 { 149 this.requestId = format("%06s", requestId); 150 151 status = State.PAIRED; 152 153 if (s !is null && this.clientSkt is null) 154 { 155 dead.remove(id); 156 alive.insertFront(id); 157 s.blocking = false; 158 } 159 160 this.clientSkt = s; 161 } 162 163 // Reset the communicator to the initial state and clear the requests queue 164 void reset() 165 { 166 if (clientSkt !is null) 167 { 168 clientSkt.shutdown(SocketShutdown.BOTH); 169 clientSkt.close(); 170 unsetClientSocket(); 171 } 172 173 status = State.READY; 174 responseLength = 0; 175 responseSent = 0; 176 leftover.length = 0; 177 178 // Clear requests queue 179 while(requestToProcess !is null) 180 { 181 auto tmp = requestToProcess; 182 requestToProcess = requestToProcess.next; 183 tmp.next = null; 184 } 185 186 requestToProcess = null; 187 188 requestDataReceived = false; 189 lastRecv = CoarseTime.zero; 190 lastRequest = CoarseTime.zero; 191 192 hasQueuedRequests = false; 193 } 194 195 // If this communicator has a worker assigned, unset it 196 pragma(inline, true) 197 void unsetWorker() 198 { 199 200 if (this.worker !is null) 201 { 202 this.worker.communicator = null; 203 this.worker.setStatus(WorkerInfo.State.IDLING); 204 this.worker = null; 205 } 206 207 responseLength = 0; 208 responseSent = 0; 209 } 210 211 // Assign a worker to the communicator 212 pragma(inline, true) 213 void setWorker(ref WorkerInfo worker) 214 { 215 this.worker = worker; 216 worker.communicator = this; 217 218 worker.setStatus(WorkerInfo.State.PROCESSING); 219 auto current = requestToProcess; 220 uint len = cast(uint)(current.data.length) - cast(uint)uint.sizeof; 221 current.data[0..uint.sizeof] = (cast(char*)&len)[0..uint.sizeof]; 222 223 isKeepAlive = current.connection == ProtoRequest.Connection.KeepAlive; 224 worker.unixSocket.send(current.data); 225 226 requestToProcess = requestToProcess.next; 227 lastRequest = CoarseTime.currTime; 228 } 229 230 // Write the buffered data to the client socket 231 void write() 232 { 233 auto maxToSend = bufferSent + 32*1024; 234 if (maxToSend > sendBuffer.length) maxToSend = sendBuffer.length; 235 236 if (maxToSend == 0) 237 return; 238 239 immutable sent = clientSkt.send(sendBuffer.array[bufferSent..maxToSend]); 240 241 if (sent == Socket.ERROR) 242 { 243 if(!wouldHaveBlocked) 244 { 245 log("Socket Error"); 246 reset(); 247 } 248 } 249 else 250 { 251 bufferSent += sent; 252 responseSent += sent; 253 if(bufferSent == sendBuffer.length) 254 { 255 bufferSent = 0; 256 sendBuffer.clear(); 257 } 258 } 259 260 // If the response is completed, unset the worker 261 // and if the client is not keep alive, reset the communicator 262 if (completed()) 263 { 264 unsetWorker(); 265 266 if (!isKeepAlive) 267 reset(); 268 } 269 } 270 271 // Try to write the data to the client socket, it buffers the data if the socket is not ready 272 void write(char[] data) 273 { 274 if (clientSkt is null) 275 { 276 reset(); 277 return; 278 } 279 280 if (sendBuffer.length == 0) 281 { 282 auto sent = clientSkt.send(data); 283 284 if (sent == Socket.ERROR) 285 { 286 if(!wouldHaveBlocked) 287 { 288 log("Socket error on write. ", lastSocketError); 289 reset(); 290 return; 291 } 292 else sendBuffer.append(data); 293 } 294 else 295 { 296 responseSent += sent; 297 if (sent < data.length) sendBuffer.append(data[sent..data.length]); 298 } 299 300 // If the response is completed, unset the worker 301 // and if the client is not keep alive, reset the communicator 302 if (completed()) 303 { 304 unsetWorker(); 305 306 if (!isKeepAlive) 307 reset(); 308 } 309 } 310 else 311 { 312 sendBuffer.append(data); 313 write(); 314 } 315 } 316 317 // Read the data from the client socket and parse the incoming data 318 void read(bool fromBuffer = false) 319 { 320 import std.string: indexOf; 321 322 // Create a new request if the current one is completed 323 if (status == State.PAIRED || status == State.KEEP_ALIVE) 324 { 325 // Queue a new request 326 if (requestToProcess is null) requestToProcess = new ProtoRequest(); 327 else 328 { 329 ProtoRequest tmp = requestToProcess; 330 while(tmp.next !is null) 331 tmp = tmp.next; 332 333 tmp.next = new ProtoRequest(); 334 requestToProcess = tmp.next; 335 } 336 337 status = State.READING_HEADERS; 338 } 339 340 ProtoRequest request = requestToProcess; 341 342 // First 2 bytes are the length of the request 343 // They will be overwritten by the actual length of the request 344 // and sent to the worker through the unix socket 345 uint len = 0; 346 if(request.data.length == 0) 347 { 348 request.data ~= (cast(char*)(&len))[0..uint.sizeof]; 349 } 350 351 char[32*1024] buffer; 352 ptrdiff_t bytesRead = 0; 353 354 // Read the data from the client socket if it's not buffered 355 // Set the started flag to true if the first data is read to check for timeouts 356 if (!fromBuffer) 357 { 358 bytesRead = clientSkt.receive(buffer); 359 360 if (bytesRead < 0) 361 { 362 status = State.READY; 363 log("Socket error on read. ", lastSocketError); 364 reset(); 365 return; 366 } 367 368 if (bytesRead == 0) 369 { 370 // Connection closed. 371 status = State.READY; 372 reset(); 373 return; 374 } 375 else if (requestDataReceived == false) requestDataReceived = true; 376 377 } 378 else 379 { 380 bytesRead = 0; 381 requestDataReceived = true; 382 } 383 384 auto bufferRead = buffer[0..bytesRead]; 385 386 // If there's leftover data from the previous read, append it to the current buffer 387 if (leftover.length) 388 { 389 bufferRead = leftover ~ bufferRead; 390 leftover.length = 0; 391 } 392 393 bool tryParse = true; 394 while(tryParse) 395 { 396 bool enqueueNewRequest = false; 397 tryParse = false; 398 399 if (status == State.READING_HEADERS) 400 { 401 // We are still waiting for the headers to be completed 402 if (!request.headersDone) 403 { 404 auto headersEnd = bufferRead.indexOf("\r\n\r\n"); 405 406 // Are the headers completed? 407 if (headersEnd >= 0) 408 { 409 if (headersEnd > config.maxRequestSize) 410 { 411 clientSkt.send("HTTP/1.0 413 Request Entity Too Large\r\n"); 412 reset(); 413 return; 414 } 415 416 import std.algorithm : splitter, map, joiner; 417 418 // Extra data after the headers is stored in the leftover buffer 419 request.data ~= bufferRead[0..headersEnd]; 420 leftover = bufferRead[headersEnd+4..$]; 421 422 // The headers are completed 423 bufferRead.length = 0; 424 request.headersDone = true; 425 request.isValid = true; 426 427 auto firstLine = request.data.indexOf("\r\n"); 428 429 // HACK: A single line (http 1.0?) request. 430 if (firstLine < 0) 431 { 432 firstLine = request.data.length; 433 request.data ~= "\r\n"; 434 } 435 436 if (firstLine < 18) 437 { 438 request.isValid = false; 439 clientSkt.send("HTTP/1.0 400 Bad Request\r\n"); 440 debug warning("Bad Request. Request line too short."); 441 reset(); 442 return; 443 } 444 445 auto fields = request.data[uint.sizeof..firstLine].splitter(' '); 446 size_t popped = 0; 447 448 if (!fields.empty) 449 { 450 request.method = fields.front; 451 fields.popFront; 452 popped++; 453 } 454 455 if (!fields.empty) 456 { 457 request.path = fields.front; 458 fields.popFront; 459 popped++; 460 } 461 462 if (!fields.empty) 463 { 464 request.httpVersion = cast(ProtoRequest.HttpVersion)fields.front; 465 fields.popFront; 466 popped++; 467 } 468 469 // HTTP version must be 1.0 or 1.1 470 if (request.httpVersion != ProtoRequest.HttpVersion.HTTP_10 && request.httpVersion != ProtoRequest.HttpVersion.HTTP_11) 471 { 472 request.isValid = false; 473 clientSkt.send("HTTP/1.0 400 Bad Request\r\n"); 474 debug warning("Bad Request. Http version unknown."); 475 reset(); 476 return; 477 } 478 479 if (popped != 3 || !fields.empty) 480 { 481 request.isValid = false; 482 clientSkt.send("HTTP/1.0 400 Bad Request\r\n"); 483 debug warning("Bad Request. Malformed request line."); 484 reset(); 485 return; 486 } 487 488 if (request.path[0] != '/') 489 { 490 request.isValid = false; 491 clientSkt.send("HTTP/1.0 400 Bad Request\r\n"); 492 debug warning("Bad Request. Absolute uri?"); 493 reset(); 494 return; 495 } 496 497 // Parse headers for 100-continue, content-length and connection 498 auto hdrs = request.data[firstLine+2..$] 499 .splitter("\r\n") 500 .map!((char[] row) 501 { 502 if (!request.isValid) 503 return (char[]).init; 504 505 auto headerColon = row.indexOf(':'); 506 507 if (headerColon < 0) 508 { 509 request.isValid = false; 510 return (char[]).init; 511 } 512 513 // Headers keys are case insensitive, so we lowercase them 514 // We strip the leading and trailing spaces from both key and value 515 char[] key = cast(char[])row[0..headerColon].strip!(x =>x==' ' || x=='\t'); 516 char[] value = cast(char[])row[headerColon+1..$].strip!(x =>x==' '|| x=='\t'); 517 518 // Fast way to lowercase the key. Check if it is ASCII only. 519 foreach(idx, ref k; key) 520 { 521 if (k > 0xF9) 522 { 523 request.isValid = false; 524 return (char[]).init; 525 } 526 else if (k >= 'A' && k <= 'Z') 527 k |= 32; 528 } 529 530 foreach(idx, ref k; value) 531 { 532 if (k > 0xF9) 533 { 534 request.isValid = false; 535 return (char[]).init; 536 } 537 } 538 539 if (key.length == 0 || value.length == 0) 540 { 541 request.isValid = false; 542 return (char[]).init; 543 } 544 545 // 100-continue 546 if (key == "expect" && value.length == 12 && value[0..4] == "100-") request.expect100 = true; 547 else if (key == "connection") 548 { 549 import std.uni: sicmp; 550 551 if (sicmp(value, "keep-alive") == 0) request.connection = ProtoRequest.Connection.KeepAlive; 552 else if (sicmp(value, "close") == 0) request.connection = ProtoRequest.Connection.Close; 553 else request.connection = ProtoRequest.connection.Unknown; 554 } 555 else 556 try { if (key == "content-length") request.contentLength = value.to!size_t; } 557 catch (Exception e) { request.isValid = false; return (char[]).init; } 558 559 return key ~ ":" ~ value; 560 }) 561 .join("\r\n") ~ "\r\n\r\n"; 562 563 564 request.data.length = firstLine+2 + hdrs.length; 565 566 // If required by configuration, add the remote ip to the headers 567 // It is disabled by default, as it is a slow operation and it is not always needed 568 string ra = string.init; 569 if(config.withRemoteIp) 570 { 571 ra = "x-remote-ip:" ~ clientSkt.remoteAddress().toAddrString() ~ "\r\n"; 572 request.data.length += ra.length; 573 request.data[firstLine+2..firstLine+2+ra.length] = ra; 574 } 575 576 request.data[firstLine+2+ra.length..$] = hdrs[0..$]; 577 578 if (request.isValid == false) 579 { 580 clientSkt.send("HTTP/1.0 400 Bad Request\r\n"); 581 debug warning("Bad Request. Malformed request."); 582 reset(); 583 return; 584 } 585 586 // Keep alive is the default for HTTP/1.1, close for HTTP/1.0 587 if (request.connection == ProtoRequest.Connection.Unknown) 588 { 589 if (request.httpVersion == ProtoRequest.HttpVersion.HTTP_11) request.connection = ProtoRequest.Connection.KeepAlive; 590 else request.connection = ProtoRequest.Connection.Close; 591 } 592 593 request.headersLength = request.data.length; 594 595 // If the request has a body, we need to read it 596 if (request.contentLength != 0) 597 { 598 if (request.headersLength + request.contentLength > config.maxRequestSize) 599 { 600 clientSkt.send("HTTP/1.0 413 Request Entity Too Large\r\n"); 601 reset(); 602 return; 603 } 604 605 request.data.reserve(request.headersLength + request.contentLength); 606 request.isValid = false; 607 tryParse = true; 608 status = State.READING_BODY; 609 610 // If required, we send the 100-continue response now 611 if (request.expect100) 612 clientSkt.send(cast(char[])(request.httpVersion ~ " 100 continue\r\n\r\n")); 613 } 614 else 615 { 616 // No body, we can process the request 617 requestDataReceived = false; 618 hasQueuedRequests = leftover.length > 0; 619 enqueueNewRequest = hasQueuedRequests; 620 621 if (request.connection == ProtoRequest.Connection.KeepAlive) status = State.KEEP_ALIVE; 622 else status = State.READY; 623 624 } 625 } 626 else leftover = bufferRead.dup; 627 } 628 629 } 630 else if (status == State.READING_BODY) 631 { 632 // We are reading the body of the request 633 request.data ~= leftover; 634 request.data ~= bufferRead; 635 636 leftover.length = 0; 637 bufferRead.length = 0; 638 639 if (request.data.length >= request.headersLength + request.contentLength) 640 { 641 // We read the whole body, process the request 642 requestDataReceived = false; 643 leftover = request.data[request.headersLength + request.contentLength..$]; 644 request.data = request.data[0..request.headersLength + request.contentLength]; 645 request.isValid = true; 646 647 hasQueuedRequests = leftover.length > 0; 648 enqueueNewRequest = hasQueuedRequests; 649 650 if (request.connection == ProtoRequest.Connection.KeepAlive) status = State.KEEP_ALIVE; 651 else status = State.READY; 652 653 } 654 } 655 656 if (enqueueNewRequest) 657 { 658 // There's a (partial) new request in the buffer, we need to create a new request 659 request.next = new ProtoRequest(); 660 request = request.next; 661 status = State.READING_HEADERS; 662 663 len = 0; 664 request.data.reserve(1024*10); 665 request.data ~= (cast(char*)(&len))[0..uint.sizeof]; 666 667 bufferRead = leftover; 668 leftover.length = 0; 669 670 // We try to parse the new request immediately 671 tryParse = true; 672 } 673 } 674 675 } 676 677 DataBuffer!char sendBuffer; 678 size_t bufferSent; 679 string requestId; 680 681 bool hasQueuedRequests = false; 682 bool requestDataReceived; 683 bool isKeepAlive; 684 size_t responseSent; 685 size_t responseLength; 686 size_t id; 687 Socket clientSkt; 688 689 ProtoRequest requestToProcess; 690 WorkerInfo worker; 691 char[] leftover; 692 693 CoarseTime lastRecv = CoarseTime.zero; 694 CoarseTime lastRequest = CoarseTime.zero; 695 696 static SimpleList alive; 697 static SimpleList dead; 698 Communicator.State status; 699 static Communicator[] instances; 700 }