1 /* 2 Copyright (c) 2023-2024 Andrea Fontana 3 4 Permission is hereby granted, free of charge, to any person 5 obtaining a copy of this software and associated documentation 6 files (the "Software"), to deal in the Software without 7 restriction, including without limitation the rights to use, 8 copy, modify, merge, publish, distribute, sublicense, and/or sell 9 copies of the Software, and to permit persons to whom the 10 Software is furnished to do so, subject to the following 11 conditions: 12 13 The above copyright notice and this permission notice shall be 14 included in all copies or substantial portions of the Software. 15 16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 17 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES 18 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 19 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT 20 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 21 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR 23 OTHER DEALINGS IN THE SOFTWARE. 24 */ 25 26 module serverino.daemon; 27 28 import serverino.common; 29 import serverino.communicator; 30 import serverino.config; 31 32 import std.stdio : File; 33 import std.conv : to; 34 import std.experimental.logger : log, info, warning; 35 import std.process : ProcessPipes; 36 37 import std.format : format; 38 import std.socket; 39 import std.array : array; 40 import std.algorithm : filter; 41 import std.datetime : SysTime, Clock, dur; 42 43 44 // The class WorkerInfo is used to keep track of the workers. 45 package class WorkerInfo 46 { 47 enum State 48 { 49 IDLING = 0, // Worker is waiting for a request. 50 PROCESSING, // Worker is processing a request. 51 STOPPED // Worker is stopped. 52 } 53 54 // New worker instances are set to STOPPED and added to the lookup table. 55 this() 56 { 57 this.id = instances.length; 58 instances ~= this; 59 60 status = State.STOPPED; 61 statusChangedAt = Clock.currTime(); 62 lookup[State.STOPPED].insertBack(id); 63 } 64 65 // Initialize the worker. 66 void init() 67 { 68 assert(status == State.STOPPED); 69 70 // Set default status. 71 clear(); 72 73 import std.process : pipeProcess, Redirect, Config; 74 import std.file : thisExePath; 75 import std.uuid : randomUUID; 76 77 // Create a new socket and bind it to a random address. 78 auto uuid = randomUUID().toString(); 79 80 // We use a unix socket on both linux and macos/windows but ... 81 version(linux) 82 { 83 string socketAddress = "SERVERINO_SOCKET/" ~ uuid; 84 Socket s = new Socket(AddressFamily.UNIX, SocketType.STREAM); 85 s.bind(new UnixAddress("\0%s".format(socketAddress))); 86 } 87 else 88 { 89 // ... on windows and macos we use a temporary file. 90 import std.path : buildPath; 91 import std.file : tempDir; 92 string socketAddress = buildPath(tempDir, uuid); 93 Socket s = new Socket(AddressFamily.UNIX, SocketType.STREAM); 94 s.bind(new UnixAddress(socketAddress)); 95 } 96 97 s.listen(1); 98 99 // We start a new process and pass the socket address to it. 100 auto env = Daemon.instance.workerEnvironment.dup; 101 env["SERVERINO_SOCKET"] = socketAddress; 102 103 auto pipes = pipeProcess(thisExePath(), Redirect.stdin, env, Config.detached); 104 105 Socket accepted = s.accept(); 106 s.blocking = false; 107 108 this.pi = new ProcessInfo(pipes.pid.processID); 109 this.unixSocket = accepted; 110 111 setStatus(WorkerInfo.State.IDLING); 112 } 113 114 ~this() 115 { 116 clear(); 117 } 118 119 void clear() 120 { 121 assert(status == State.STOPPED); 122 123 if (this.pi) this.pi.kill(); 124 125 if (this.unixSocket) 126 { 127 unixSocket.shutdown(SocketShutdown.BOTH); 128 unixSocket.close(); 129 unixSocket = null; 130 } 131 132 communicator = null; 133 } 134 135 void setStatus(State s) 136 { 137 import std.conv : to; 138 assert(s!=status || s == State.PROCESSING, id.to!string ~ " > Trying to change WorkerInfo status from " ~ status.to!string ~ " to " ~ s.to!string); 139 140 if (s!=status) 141 { 142 lookup[status].remove(id); 143 lookup[s].insertBack(id); 144 } 145 146 status = s; 147 statusChangedAt = Clock.currTime(); 148 } 149 150 package: 151 152 size_t id; 153 ProcessInfo pi; 154 155 SysTime statusChangedAt; 156 157 State status = State.STOPPED; 158 Socket unixSocket = null; 159 Communicator communicator = null; 160 161 static WorkerInfo[] instances; 162 static SimpleList[3] lookup; 163 } 164 165 version(Posix) 166 { 167 extern(C) void serverino_exit_handler(int num) nothrow @nogc @system 168 { 169 import core.stdc.stdlib : exit; 170 if (Daemon.exitRequested) exit(-1); 171 else Daemon.exitRequested = true; 172 } 173 } 174 175 // The Daemon class is the core of serverino. 176 struct Daemon 177 { 178 static auto isReady() { return ready; } 179 180 // The instance method returns the singleton instance of the Daemon class. 181 static auto instance() 182 { 183 static Daemon* _instance; 184 if (_instance is null) _instance = new Daemon(); 185 return _instance; 186 } 187 188 // Create a lazy list of busy workers. 189 pragma(inline, true) 190 auto ref workersAlive() 191 { 192 import std.range : chain; 193 return chain( 194 WorkerInfo.lookup[WorkerInfo.State.IDLING].asRange, 195 WorkerInfo.lookup[WorkerInfo.State.PROCESSING].asRange 196 ); 197 } 198 199 // Create a lazy list of workers we can reuse. 200 pragma(inline, true); 201 auto ref workersDead() 202 { 203 return WorkerInfo.lookup[WorkerInfo.State.STOPPED].asRange; 204 } 205 206 207 void wake(Modules...)(DaemonConfigPtr config) 208 { 209 import serverino.interfaces : Request; 210 import std.process : environment, thisProcessID; 211 import std.stdio; 212 213 workerEnvironment = environment.toAA(); 214 workerEnvironment["SERVERINO_DAEMON"] = thisProcessID.to!string; 215 workerEnvironment["SERVERINO_BUILD"] = Request.simpleNotSecureCompileTimeHash(); 216 217 info("Daemon started."); 218 219 version(Posix) 220 { 221 import core.sys.posix.signal; 222 sigaction_t act = { sa_handler: &serverino_exit_handler }; 223 sigaction(SIGINT, &act, null); 224 sigaction(SIGTERM, &act, null); 225 } 226 else version(Windows) scope(exit) tryUninit!Modules(); 227 228 tryInit!Modules(); 229 230 // Starting all the listeners. 231 foreach(ref listener; config.listeners) 232 { 233 listener.socket = new TcpSocket(listener.address.addressFamily); 234 version(Windows) { } else { listener.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); } 235 236 try 237 { 238 listener.socket.bind(listener.address); 239 listener.socket.listen(config.listenerBacklog); 240 info("Listening on http://%s/".format(listener.socket.localAddress.toString)); 241 } 242 catch (SocketException se) 243 { 244 import std.experimental.logger : critical; 245 import core.stdc.stdlib : exit, EXIT_FAILURE; 246 import std.stdio : stderr; 247 248 critical("Can't listen on http://%s/. Are you allowed to listen on this port? Is port already used by another process?".format(listener.address.toString)); 249 250 foreach(ref l; config.listeners) 251 { 252 if (l.socket !is null) 253 { 254 l.socket.shutdown(SocketShutdown.BOTH); 255 } 256 } 257 258 exit(EXIT_FAILURE); 259 } 260 } 261 262 // Workers 263 import std.traits : EnumMembers; 264 static foreach(e; EnumMembers!(WorkerInfo.State)) 265 { 266 WorkerInfo.lookup[e] = SimpleList(); 267 } 268 269 foreach(i; 0..config.maxWorkers) 270 new WorkerInfo(); 271 272 Communicator.alive = SimpleList(); 273 Communicator.dead = SimpleList(); 274 275 foreach(idx; 0..128) 276 new Communicator(config); 277 278 // We use a socketset to check for updates 279 SocketSet ssRead = new SocketSet(config.listeners.length + WorkerInfo.instances.length); 280 SocketSet ssWrite = new SocketSet(128); 281 282 while(!exitRequested) 283 { 284 // Create workers if needed. Kills old workers, freezed ones, etc. 285 checkWorkers(config); 286 287 ready = true; 288 289 ssRead.reset(); 290 ssWrite.reset(); 291 292 // Fill socketSet with listeners, waiting for new connections. 293 foreach(ref listener; config.listeners) 294 ssRead.add(listener.socket); 295 296 // Fill socketSet with workers, waiting updates. 297 foreach(idx; workersAlive) 298 ssRead.add(WorkerInfo.instances[idx].unixSocket); 299 300 // Fill socketSet with communicators, waiting for updates. 301 foreach(idx; Communicator.alive.asRange) 302 { 303 ssRead.add(Communicator.instances[idx].clientSkt); 304 305 if (!Communicator.instances[idx].completed) 306 ssWrite.add(Communicator.instances[idx].clientSkt); 307 } 308 309 long updates = -1; 310 try { updates = Socket.select(ssRead, ssWrite, null, 1.dur!"seconds"); } 311 catch (SocketException se) { 312 import std.experimental.logger : warning; 313 warning("Exception: ", se.msg); 314 } 315 316 // Check for timeouts. 317 immutable now = CoarseTime.currTime; 318 { 319 static CoarseTime lastCheck = CoarseTime.zero; 320 321 if (now-lastCheck >= 1.dur!"seconds") 322 { 323 lastCheck = now; 324 325 // A list of communicators that hit the timeout. 326 Communicator[] toReset; 327 328 foreach(idx; Communicator.alive.asRange) 329 { 330 auto communicator = Communicator.instances[idx]; 331 332 // Keep-alive timeout hit. 333 if (communicator.status == Communicator.State.KEEP_ALIVE && communicator.lastRequest != CoarseTime.zero && now - communicator.lastRequest > 5.dur!"seconds") 334 toReset ~= communicator; 335 336 // Http timeout hit. 337 else if (communicator.status == Communicator.State.PAIRED || communicator.status == Communicator.State.READING_BODY || communicator.status == Communicator.State.READING_HEADERS ) 338 { 339 if (communicator.lastRecv != CoarseTime.zero && now - communicator.lastRecv > config.maxHttpWaiting) 340 { 341 if (communicator.requestDataReceived) 342 { 343 debug warning("Connection closed. [REASON: http timeout]"); 344 communicator.clientSkt.send("HTTP/1.0 408 Request Timeout\r\n"); 345 } 346 toReset ~= communicator; 347 } 348 } 349 } 350 351 // Reset the communicators that hit the timeout. 352 foreach(communicator; toReset) 353 communicator.reset(); 354 355 } 356 357 // Free dead communicators. 358 if (Communicator.instances.length > 1024) 359 foreach(communicator; Communicator.dead.asRange) 360 { 361 if (communicator == Communicator.instances.length - 1 && communicator > 128) 362 { 363 Communicator.dead.remove(communicator); 364 Communicator.instances.length--; 365 break; 366 } 367 } 368 } 369 370 if (updates < 0) break; 371 else if (updates == 0) continue; 372 373 if (exitRequested) 374 break; 375 376 auto wa = workersAlive; 377 size_t nextIdx; 378 379 // Check the workers for updates 380 while(!wa.empty) 381 { 382 if (updates == 0) 383 break; 384 385 auto idx = wa.front; 386 wa.popFront; 387 388 if (!wa.empty) 389 nextIdx = wa.front; 390 391 scope(exit) idx = nextIdx; 392 393 WorkerInfo worker = WorkerInfo.instances[idx]; 394 Communicator communicator = worker.communicator; 395 396 if (ssRead.isSet(worker.unixSocket)) 397 { 398 updates--; 399 400 if (communicator is null) 401 { 402 worker.pi.kill(); 403 worker.setStatus(WorkerInfo.State.STOPPED); 404 continue; 405 } 406 407 ubyte[32*1024] buffer; 408 auto bytes = worker.unixSocket.receive(buffer); 409 410 if (bytes == Socket.ERROR) 411 { 412 debug warning("Error: worker killed?"); 413 worker.setStatus(WorkerInfo.State.STOPPED); 414 worker.clear(); 415 communicator.reset(); 416 } 417 else if (bytes == 0) 418 { 419 worker.setStatus(WorkerInfo.State.STOPPED); 420 worker.clear(); 421 422 // User closed socket. 423 if (communicator !is null && communicator.clientSkt !is null && communicator.clientSkt.isAlive) 424 communicator.clientSkt.shutdown(SocketShutdown.BOTH); 425 } 426 else 427 { 428 if (communicator is null) 429 { 430 continue; 431 } 432 if (communicator.responseLength == 0) 433 { 434 WorkerPayload *wp = cast(WorkerPayload*)buffer.ptr; 435 436 communicator.isKeepAlive = wp.isKeepAlive; 437 communicator.setResponseLength(wp.contentLength); 438 communicator.write(cast(char[])buffer[WorkerPayload.sizeof..bytes]); 439 } 440 else communicator.write(cast(char[])buffer[0..bytes]); 441 } 442 } 443 444 } 445 446 // Check the communicators for updates 447 foreach(idx; Communicator.alive.asRange) 448 { 449 auto communicator = Communicator.instances[idx]; 450 451 if(communicator.clientSkt is null) 452 continue; 453 454 if (ssRead.isSet(communicator.clientSkt)) 455 { 456 updates--; 457 communicator.lastRecv = now; 458 communicator.read(); 459 } 460 else if(communicator.hasQueuedRequests) 461 { 462 communicator.lastRecv = now; 463 communicator.read(true); 464 } 465 466 if (updates > 0 && communicator.clientSkt !is null && ssWrite.isSet(communicator.clientSkt)) 467 { 468 updates--; 469 communicator.write(); 470 } 471 } 472 473 // Check for communicators that need a worker. 474 foreach(ref communicator; Communicator.instances.filter!(x=>x.requestToProcess !is null && x.requestToProcess.isValid && x.worker is null)) 475 { 476 auto workers = WorkerInfo.lookup[WorkerInfo.State.IDLING].asRange; 477 478 479 if (!workers.empty) communicator.setWorker(WorkerInfo.instances[workers.front]); 480 else { 481 auto dead = workersDead(); 482 483 if (!dead.empty) 484 { 485 WorkerInfo.instances[dead.front].init; 486 communicator.setWorker(WorkerInfo.instances[dead.front]); 487 } 488 else break; // All workers are busy. Will try again later. 489 } 490 } 491 492 // Check for new incoming connections. 493 foreach(ref listener; config.listeners) 494 { 495 496 if (updates == 0) 497 break; 498 499 500 if (ssRead.isSet(listener.socket)) 501 { 502 updates--; 503 504 // We have an incoming connection to handle 505 Communicator communicator; 506 507 // First: check if any idling communicator is available 508 auto idling = Communicator.dead.asRange; 509 510 if (!idling.empty) communicator = Communicator.instances[idling.front]; 511 else communicator = new Communicator(config); 512 513 communicator.lastRecv = now; 514 515 auto nextId = requestId++; 516 communicator.setClientSocket(listener.socket.accept(), nextId); 517 } 518 } 519 520 } 521 522 // Exit requested, shutdown everything. 523 524 // Close all the listeners. 525 foreach(ref listener; config.listeners) 526 { 527 listener.socket.shutdown(SocketShutdown.BOTH); 528 listener.socket.close(); 529 } 530 531 // Kill all the workers. 532 foreach(ref idx; workersAlive) 533 { 534 WorkerInfo worker = WorkerInfo.instances[idx]; 535 536 try 537 { 538 if (worker) 539 { 540 if (worker.unixSocket) worker.unixSocket.shutdown(SocketShutdown.BOTH); 541 if (worker.pi) worker.pi.kill(); 542 } 543 } 544 catch (Exception e) { } 545 } 546 547 // Call the onDaemonStop functions. 548 tryUninit!Modules(); 549 550 // Force exit. 551 import core.stdc.stdlib : exit; 552 exit(0); 553 } 554 555 void shutdown() @nogc nothrow { exitRequested = true; } 556 557 private: 558 559 void checkWorkers(DaemonConfigPtr config) 560 { 561 foreach(k; workersAlive) 562 { 563 auto worker = WorkerInfo.instances[k]; 564 565 if (!worker.unixSocket.isAlive) 566 { 567 log("Killing ", worker.pi.id, ". Invalid state."); 568 worker.pi.kill(); 569 worker.setStatus(WorkerInfo.State.STOPPED); 570 } 571 572 } 573 574 while ( 575 WorkerInfo.lookup[WorkerInfo.State.IDLING].length + 576 WorkerInfo.lookup[WorkerInfo.State.PROCESSING].length < config.minWorkers 577 ) 578 { 579 auto dead = workersDead(); 580 581 if (dead.empty) break; 582 583 auto idx = dead.front(); 584 WorkerInfo.instances[idx].init; 585 } 586 587 } 588 589 ulong requestId = 0; 590 string[string] workerEnvironment; 591 592 __gshared bool exitRequested = false; 593 __gshared bool ready = false; 594 } 595 596 597 void tryInit(Modules...)() 598 { 599 import std.traits : getSymbolsByUDA, isFunction; 600 601 static foreach(m; Modules) 602 { 603 static foreach(f; getSymbolsByUDA!(m, onDaemonStart)) 604 {{ 605 static assert(isFunction!f, "`" ~ __traits(identifier, f) ~ "` is marked with @onDaemonStart but it is not a function"); 606 607 static if (__traits(compiles, f())) f(); 608 else static assert(0, "`" ~ __traits(identifier, f) ~ "` is marked with @onDaemonStart but it is not callable"); 609 610 }} 611 } 612 } 613 614 void tryUninit(Modules...)() 615 { 616 import std.traits : getSymbolsByUDA, isFunction; 617 618 static foreach(m; Modules) 619 { 620 static foreach(f; getSymbolsByUDA!(m, onDaemonStop)) 621 {{ 622 static assert(isFunction!f, "`" ~ __traits(identifier, f) ~ "` is marked with @onDaemonStop but it is not a function"); 623 624 static if (__traits(compiles, f())) f(); 625 else static assert(0, "`" ~ __traits(identifier, f) ~ "` is marked with @onDaemonStop but it is not callable"); 626 627 }} 628 } 629 }