1 /*
2 Copyright (c) 2023-2024 Andrea Fontana
3 
4 Permission is hereby granted, free of charge, to any person
5 obtaining a copy of this software and associated documentation
6 files (the "Software"), to deal in the Software without
7 restriction, including without limitation the rights to use,
8 copy, modify, merge, publish, distribute, sublicense, and/or sell
9 copies of the Software, and to permit persons to whom the
10 Software is furnished to do so, subject to the following
11 conditions:
12 
13 The above copyright notice and this permission notice shall be
14 included in all copies or substantial portions of the Software.
15 
16 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
18 OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
21 WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
23 OTHER DEALINGS IN THE SOFTWARE.
24 */
25 
26 module serverino.daemon;
27 
28 import serverino.common;
29 import serverino.communicator;
30 import serverino.config;
31 
32 import std.stdio : File;
33 import std.conv : to;
34 import std.experimental.logger : log, info, warning;
35 import std.process : ProcessPipes;
36 
37 import std.format : format;
38 import std.socket;
39 import std.array : array;
40 import std.algorithm : filter;
41 import std.datetime : SysTime, Clock, dur;
42 
43 
44 // The class WorkerInfo is used to keep track of the workers.
45 package class WorkerInfo
46 {
47    enum State
48    {
49       IDLING = 0, // Worker is waiting for a request.
50       PROCESSING, // Worker is processing a request.
51       STOPPED     // Worker is stopped.
52    }
53 
54    // New worker instances are set to STOPPED and added to the lookup table.
55    this()
56    {
57       this.id = instances.length;
58       instances ~= this;
59 
60       status = State.STOPPED;
61       statusChangedAt = Clock.currTime();
62       lookup[State.STOPPED].insertBack(id);
63    }
64 
65    // Initialize the worker.
66    void init()
67    {
68       assert(status == State.STOPPED);
69 
70       // Set default status.
71       clear();
72 
73       import std.process : pipeProcess, Redirect, Config;
74       import std.file : thisExePath;
75       import std.uuid : randomUUID;
76 
77       // Create a new socket and bind it to a random address.
78       auto uuid = randomUUID().toString();
79 
80       // We use a unix socket on both linux and macos/windows but ...
81       version(linux)
82       {
83          string socketAddress = "SERVERINO_SOCKET/" ~ uuid;
84          Socket s = new Socket(AddressFamily.UNIX, SocketType.STREAM);
85          s.bind(new UnixAddress("\0%s".format(socketAddress)));
86       }
87       else
88       {
89          // ... on windows and macos we use a temporary file.
90          import std.path : buildPath;
91          import std.file : tempDir;
92          string socketAddress = buildPath(tempDir, uuid);
93          Socket s = new Socket(AddressFamily.UNIX, SocketType.STREAM);
94          s.bind(new UnixAddress(socketAddress));
95       }
96 
97       s.listen(1);
98 
99       // We start a new process and pass the socket address to it.
100       auto env = Daemon.instance.workerEnvironment.dup;
101       env["SERVERINO_SOCKET"] = socketAddress;
102 
103       auto pipes = pipeProcess(thisExePath(), Redirect.stdin, env, Config.detached);
104 
105       Socket accepted = s.accept();
106       s.blocking = false;
107 
108       this.pi = new ProcessInfo(pipes.pid.processID);
109       this.unixSocket = accepted;
110 
111       setStatus(WorkerInfo.State.IDLING);
112    }
113 
114    ~this()
115    {
116       clear();
117    }
118 
119    void clear()
120    {
121       assert(status == State.STOPPED);
122 
123       if (this.pi) this.pi.kill();
124 
125       if (this.unixSocket)
126       {
127          unixSocket.shutdown(SocketShutdown.BOTH);
128          unixSocket.close();
129          unixSocket = null;
130       }
131 
132       communicator = null;
133    }
134 
135    void setStatus(State s)
136    {
137       import std.conv : to;
138       assert(s!=status || s == State.PROCESSING, id.to!string ~ " > Trying to change WorkerInfo status from " ~ status.to!string ~ " to " ~ s.to!string);
139 
140       if (s!=status)
141       {
142          lookup[status].remove(id);
143          lookup[s].insertBack(id);
144       }
145 
146       status = s;
147       statusChangedAt = Clock.currTime();
148    }
149 
150 package:
151 
152    size_t                  id;
153    ProcessInfo             pi;
154 
155    SysTime                 statusChangedAt;
156 
157    State                   status      = State.STOPPED;
158    Socket                  unixSocket     = null;
159    Communicator            communicator = null;
160 
161    static WorkerInfo[]   instances;
162    static SimpleList[3]  lookup;
163 }
164 
165 version(Posix)
166 {
167    extern(C) void serverino_exit_handler(int num) nothrow @nogc @system
168    {
169       import core.stdc.stdlib : exit;
170       if (Daemon.exitRequested) exit(-1);
171       else Daemon.exitRequested = true;
172    }
173 }
174 
175 // The Daemon class is the core of serverino.
176 struct Daemon
177 {
178    static auto isReady() { return ready; }
179 
180    // The instance method returns the singleton instance of the Daemon class.
181    static auto instance()
182    {
183       static Daemon* _instance;
184       if (_instance is null) _instance = new Daemon();
185       return _instance;
186    }
187 
188    // Create a lazy list of busy workers.
189    pragma(inline, true)
190    auto ref workersAlive()
191    {
192       import std.range : chain;
193       return chain(
194          WorkerInfo.lookup[WorkerInfo.State.IDLING].asRange,
195          WorkerInfo.lookup[WorkerInfo.State.PROCESSING].asRange
196       );
197    }
198 
199    // Create a lazy list of workers we can reuse.
200    pragma(inline, true);
201    auto ref workersDead()
202    {
203       return WorkerInfo.lookup[WorkerInfo.State.STOPPED].asRange;
204    }
205 
206 
207    void wake(Modules...)(DaemonConfigPtr config)
208    {
209       import serverino.interfaces : Request;
210       import std.process : environment, thisProcessID;
211       import std.stdio;
212 
213       workerEnvironment = environment.toAA();
214       workerEnvironment["SERVERINO_DAEMON"] = thisProcessID.to!string;
215       workerEnvironment["SERVERINO_BUILD"] = Request.simpleNotSecureCompileTimeHash();
216 
217       info("Daemon started.");
218 
219       version(Posix)
220       {
221          import core.sys.posix.signal;
222          sigaction_t act = { sa_handler: &serverino_exit_handler };
223 	      sigaction(SIGINT, &act, null);
224          sigaction(SIGTERM, &act, null);
225       }
226       else version(Windows) scope(exit) tryUninit!Modules();
227 
228       tryInit!Modules();
229 
230       // Starting all the listeners.
231       foreach(ref listener; config.listeners)
232       {
233          listener.socket = new TcpSocket(listener.address.addressFamily);
234          version(Windows) { } else { listener.socket.setOption(SocketOptionLevel.SOCKET, SocketOption.REUSEADDR, true); }
235 
236          try
237          {
238             listener.socket.bind(listener.address);
239             listener.socket.listen(config.listenerBacklog);
240             info("Listening on http://%s/".format(listener.socket.localAddress.toString));
241          }
242          catch (SocketException se)
243          {
244             import std.experimental.logger : critical;
245             import core.stdc.stdlib : exit, EXIT_FAILURE;
246             import std.stdio : stderr;
247 
248             critical("Can't listen on http://%s/. Are you allowed to listen on this port? Is port already used by another process?".format(listener.address.toString));
249 
250             foreach(ref l; config.listeners)
251             {
252                if (l.socket !is null)
253                {
254                   l.socket.shutdown(SocketShutdown.BOTH);
255                }
256             }
257 
258             exit(EXIT_FAILURE);
259          }
260       }
261 
262       // Workers
263       import std.traits : EnumMembers;
264       static foreach(e; EnumMembers!(WorkerInfo.State))
265       {
266          WorkerInfo.lookup[e] = SimpleList();
267       }
268 
269       foreach(i; 0..config.maxWorkers)
270          new WorkerInfo();
271 
272       Communicator.alive = SimpleList();
273       Communicator.dead = SimpleList();
274 
275       foreach(idx; 0..128)
276          new Communicator(config);
277 
278       // We use a socketset to check for updates
279       SocketSet ssRead = new SocketSet(config.listeners.length + WorkerInfo.instances.length);
280       SocketSet ssWrite = new SocketSet(128);
281 
282       while(!exitRequested)
283       {
284          // Create workers if needed. Kills old workers, freezed ones, etc.
285          checkWorkers(config);
286 
287          ready = true;
288 
289          ssRead.reset();
290          ssWrite.reset();
291 
292          // Fill socketSet with listeners, waiting for new connections.
293          foreach(ref listener; config.listeners)
294             ssRead.add(listener.socket);
295 
296          // Fill socketSet with workers, waiting updates.
297          foreach(idx; workersAlive)
298             ssRead.add(WorkerInfo.instances[idx].unixSocket);
299 
300          // Fill socketSet with communicators, waiting for updates.
301          foreach(idx; Communicator.alive.asRange)
302          {
303             ssRead.add(Communicator.instances[idx].clientSkt);
304 
305             if (!Communicator.instances[idx].completed)
306                ssWrite.add(Communicator.instances[idx].clientSkt);
307          }
308 
309          long updates = -1;
310          try { updates = Socket.select(ssRead, ssWrite, null, 1.dur!"seconds"); }
311          catch (SocketException se) {
312             import std.experimental.logger : warning;
313             warning("Exception: ", se.msg);
314          }
315 
316          // Check for timeouts.
317          immutable now = CoarseTime.currTime;
318          {
319             static CoarseTime lastCheck = CoarseTime.zero;
320 
321             if (now-lastCheck >= 1.dur!"seconds")
322             {
323                lastCheck = now;
324 
325                // A list of communicators that hit the timeout.
326                Communicator[] toReset;
327 
328                foreach(idx; Communicator.alive.asRange)
329                {
330                   auto communicator = Communicator.instances[idx];
331 
332                   // Keep-alive timeout hit.
333                   if (communicator.status == Communicator.State.KEEP_ALIVE && communicator.lastRequest != CoarseTime.zero && now - communicator.lastRequest > 5.dur!"seconds")
334                      toReset ~= communicator;
335 
336                   // Http timeout hit.
337                   else if (communicator.status == Communicator.State.PAIRED || communicator.status == Communicator.State.READING_BODY || communicator.status == Communicator.State.READING_HEADERS )
338                   {
339                      if (communicator.lastRecv != CoarseTime.zero && now - communicator.lastRecv > config.maxHttpWaiting)
340                      {
341                         if (communicator.requestDataReceived)
342                         {
343                            debug warning("Connection closed. [REASON: http timeout]");
344                            communicator.clientSkt.send("HTTP/1.0 408 Request Timeout\r\n");
345                         }
346                         toReset ~= communicator;
347                      }
348                   }
349                }
350 
351                // Reset the communicators that hit the timeout.
352                foreach(communicator; toReset)
353                   communicator.reset();
354 
355             }
356 
357             // Free dead communicators.
358             if (Communicator.instances.length > 1024)
359             foreach(communicator; Communicator.dead.asRange)
360             {
361                if (communicator == Communicator.instances.length - 1 && communicator > 128)
362                {
363                   Communicator.dead.remove(communicator);
364                   Communicator.instances.length--;
365                   break;
366                }
367             }
368          }
369 
370          if (updates < 0) break;
371          else if (updates == 0) continue;
372 
373          if (exitRequested)
374             break;
375 
376          auto wa = workersAlive;
377          size_t nextIdx;
378 
379          // Check the workers for updates
380          while(!wa.empty)
381          {
382             if (updates == 0)
383                break;
384 
385             auto idx = wa.front;
386             wa.popFront;
387 
388             if (!wa.empty)
389                nextIdx = wa.front;
390 
391             scope(exit) idx = nextIdx;
392 
393             WorkerInfo worker = WorkerInfo.instances[idx];
394             Communicator communicator = worker.communicator;
395 
396             if (ssRead.isSet(worker.unixSocket))
397             {
398                updates--;
399 
400                if (communicator is null)
401                {
402                   worker.pi.kill();
403                   worker.setStatus(WorkerInfo.State.STOPPED);
404                   continue;
405                }
406 
407                ubyte[32*1024] buffer;
408                auto bytes = worker.unixSocket.receive(buffer);
409 
410                if (bytes == Socket.ERROR)
411                {
412                   debug warning("Error: worker killed?");
413                   worker.setStatus(WorkerInfo.State.STOPPED);
414                   worker.clear();
415                   communicator.reset();
416                }
417                else if (bytes == 0)
418                {
419                   worker.setStatus(WorkerInfo.State.STOPPED);
420                   worker.clear();
421 
422                   // User closed socket.
423                   if (communicator !is null && communicator.clientSkt !is null && communicator.clientSkt.isAlive)
424                      communicator.clientSkt.shutdown(SocketShutdown.BOTH);
425                }
426                else
427                {
428                   if (communicator is null)
429                   {
430                      continue;
431                   }
432                   if (communicator.responseLength == 0)
433                   {
434                      WorkerPayload *wp = cast(WorkerPayload*)buffer.ptr;
435 
436                      communicator.isKeepAlive = wp.isKeepAlive;
437                      communicator.setResponseLength(wp.contentLength);
438                      communicator.write(cast(char[])buffer[WorkerPayload.sizeof..bytes]);
439                   }
440                   else communicator.write(cast(char[])buffer[0..bytes]);
441                }
442             }
443 
444          }
445 
446          // Check the communicators for updates
447          foreach(idx; Communicator.alive.asRange)
448          {
449             auto communicator = Communicator.instances[idx];
450 
451             if(communicator.clientSkt is null)
452                continue;
453 
454             if (ssRead.isSet(communicator.clientSkt))
455             {
456                updates--;
457                communicator.lastRecv = now;
458                communicator.read();
459             }
460             else if(communicator.hasQueuedRequests)
461             {
462                communicator.lastRecv = now;
463                communicator.read(true);
464             }
465 
466             if (updates > 0 && communicator.clientSkt !is null && ssWrite.isSet(communicator.clientSkt))
467             {
468                updates--;
469                communicator.write();
470             }
471          }
472 
473          // Check for communicators that need a worker.
474          foreach(ref communicator; Communicator.instances.filter!(x=>x.requestToProcess !is null && x.requestToProcess.isValid && x.worker is null))
475          {
476             auto workers = WorkerInfo.lookup[WorkerInfo.State.IDLING].asRange;
477 
478 
479             if (!workers.empty) communicator.setWorker(WorkerInfo.instances[workers.front]);
480             else {
481                auto dead = workersDead();
482 
483                if (!dead.empty)
484                {
485                   WorkerInfo.instances[dead.front].init;
486                   communicator.setWorker(WorkerInfo.instances[dead.front]);
487                }
488                else break; // All workers are busy. Will try again later.
489             }
490          }
491 
492          // Check for new incoming connections.
493          foreach(ref listener; config.listeners)
494          {
495 
496             if (updates == 0)
497                break;
498 
499 
500             if (ssRead.isSet(listener.socket))
501             {
502                updates--;
503 
504                // We have an incoming connection to handle
505                Communicator communicator;
506 
507                // First: check if any idling communicator is available
508                auto idling = Communicator.dead.asRange;
509 
510                if (!idling.empty) communicator = Communicator.instances[idling.front];
511                else communicator = new Communicator(config);
512 
513                communicator.lastRecv = now;
514 
515                auto nextId = requestId++;
516                communicator.setClientSocket(listener.socket.accept(), nextId);
517             }
518          }
519 
520       }
521 
522       // Exit requested, shutdown everything.
523 
524       // Close all the listeners.
525       foreach(ref listener; config.listeners)
526       {
527          listener.socket.shutdown(SocketShutdown.BOTH);
528          listener.socket.close();
529       }
530 
531       // Kill all the workers.
532       foreach(ref idx; workersAlive)
533       {
534          WorkerInfo worker = WorkerInfo.instances[idx];
535 
536          try
537          {
538             if (worker)
539             {
540                if (worker.unixSocket) worker.unixSocket.shutdown(SocketShutdown.BOTH);
541                if (worker.pi) worker.pi.kill();
542             }
543          }
544          catch (Exception e) { }
545       }
546 
547       // Call the onDaemonStop functions.
548       tryUninit!Modules();
549 
550       // Force exit.
551       import core.stdc.stdlib : exit;
552       exit(0);
553    }
554 
555    void shutdown() @nogc nothrow { exitRequested = true; }
556 
557 private:
558 
559    void checkWorkers(DaemonConfigPtr config)
560    {
561       foreach(k; workersAlive)
562       {
563          auto worker = WorkerInfo.instances[k];
564 
565          if (!worker.unixSocket.isAlive)
566          {
567             log("Killing ", worker.pi.id, ". Invalid state.");
568             worker.pi.kill();
569             worker.setStatus(WorkerInfo.State.STOPPED);
570          }
571 
572       }
573 
574       while (
575          WorkerInfo.lookup[WorkerInfo.State.IDLING].length +
576          WorkerInfo.lookup[WorkerInfo.State.PROCESSING].length < config.minWorkers
577       )
578       {
579          auto dead = workersDead();
580 
581          if (dead.empty) break;
582 
583          auto idx = dead.front();
584          WorkerInfo.instances[idx].init;
585       }
586 
587    }
588 
589    ulong requestId = 0;
590    string[string] workerEnvironment;
591 
592    __gshared bool exitRequested = false;
593    __gshared bool ready = false;
594 }
595 
596 
597 void tryInit(Modules...)()
598 {
599    import std.traits : getSymbolsByUDA, isFunction;
600 
601    static foreach(m; Modules)
602    {
603       static foreach(f;  getSymbolsByUDA!(m, onDaemonStart))
604       {{
605          static assert(isFunction!f, "`" ~ __traits(identifier, f) ~ "` is marked with @onDaemonStart but it is not a function");
606 
607          static if (__traits(compiles, f())) f();
608          else static assert(0, "`" ~ __traits(identifier, f) ~ "` is marked with @onDaemonStart but it is not callable");
609 
610       }}
611    }
612 }
613 
614 void tryUninit(Modules...)()
615 {
616    import std.traits : getSymbolsByUDA, isFunction;
617 
618    static foreach(m; Modules)
619    {
620       static foreach(f;  getSymbolsByUDA!(m, onDaemonStop))
621       {{
622          static assert(isFunction!f, "`" ~ __traits(identifier, f) ~ "` is marked with @onDaemonStop but it is not a function");
623 
624          static if (__traits(compiles, f())) f();
625          else static assert(0, "`" ~ __traits(identifier, f) ~ "` is marked with @onDaemonStop but it is not callable");
626 
627       }}
628    }
629 }