1/**
2 * This is a low-level messaging API upon which more structured or restrictive
3 * APIs may be built.  The general idea is that every messageable entity is
4 * represented by a common handle type called a Tid, which allows messages to
5 * be sent to logical threads that are executing in both the current process
6 * and in external processes using the same interface.  This is an important
7 * aspect of scalability because it allows the components of a program to be
8 * spread across available resources with few to no changes to the actual
9 * implementation.
10 *
11 * A logical thread is an execution context that has its own stack and which
12 * runs asynchronously to other logical threads.  These may be preemptively
13 * scheduled kernel threads, fibers (cooperative user-space threads), or some
14 * other concept with similar behavior.
15 *
16 * The type of concurrency used when logical threads are created is determined
17 * by the Scheduler selected at initialization time.  The default behavior is
18 * currently to create a new kernel thread per call to spawn, but other
19 * schedulers are available that multiplex fibers across the main thread or
20 * use some combination of the two approaches.
21 *
22 * Copyright: Copyright Sean Kelly 2009 - 2014.
23 * License:   <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>.
24 * Authors:   Sean Kelly, Alex R��nne Petersen, Martin Nowak
25 * Source:    $(PHOBOSSRC std/_concurrency.d)
26 */
27/*          Copyright Sean Kelly 2009 - 2014.
28 * Distributed under the Boost Software License, Version 1.0.
29 *    (See accompanying file LICENSE_1_0.txt or copy at
30 *          http://www.boost.org/LICENSE_1_0.txt)
31 */
32module std.concurrency;
33
34public import std.variant;
35
36import core.atomic;
37import core.sync.condition;
38import core.sync.mutex;
39import core.thread;
40import std.range.primitives;
41import std.range.interfaces : InputRange;
42import std.traits;
43
44///
45@system unittest
46{
47    __gshared string received;
48    static void spawnedFunc(Tid ownerTid)
49    {
50        import std.conv : text;
51        // Receive a message from the owner thread.
52        receive((int i){
53            received = text("Received the number ", i);
54
55            // Send a message back to the owner thread
56            // indicating success.
57            send(ownerTid, true);
58        });
59    }
60
61    // Start spawnedFunc in a new thread.
62    auto childTid = spawn(&spawnedFunc, thisTid);
63
64    // Send the number 42 to this new thread.
65    send(childTid, 42);
66
67    // Receive the result code.
68    auto wasSuccessful = receiveOnly!(bool);
69    assert(wasSuccessful);
70    assert(received == "Received the number 42");
71}
72
73private
74{
75    template hasLocalAliasing(T...)
76    {
77        static if (!T.length)
78            enum hasLocalAliasing = false;
79        else
80            enum hasLocalAliasing = (std.traits.hasUnsharedAliasing!(T[0]) && !is(T[0] == Tid)) ||
81                                     std.concurrency.hasLocalAliasing!(T[1 .. $]);
82    }
83
84    enum MsgType
85    {
86        standard,
87        priority,
88        linkDead,
89    }
90
91    struct Message
92    {
93        MsgType type;
94        Variant data;
95
96        this(T...)(MsgType t, T vals) if (T.length > 0)
97        {
98            static if (T.length == 1)
99            {
100                type = t;
101                data = vals[0];
102            }
103            else
104            {
105                import std.typecons : Tuple;
106
107                type = t;
108                data = Tuple!(T)(vals);
109            }
110        }
111
112        @property auto convertsTo(T...)()
113        {
114            static if (T.length == 1)
115            {
116                return is(T[0] == Variant) || data.convertsTo!(T);
117            }
118            else
119            {
120                import std.typecons : Tuple;
121                return data.convertsTo!(Tuple!(T));
122            }
123        }
124
125        @property auto get(T...)()
126        {
127            static if (T.length == 1)
128            {
129                static if (is(T[0] == Variant))
130                    return data;
131                else
132                    return data.get!(T);
133            }
134            else
135            {
136                import std.typecons : Tuple;
137                return data.get!(Tuple!(T));
138            }
139        }
140
141        auto map(Op)(Op op)
142        {
143            alias Args = Parameters!(Op);
144
145            static if (Args.length == 1)
146            {
147                static if (is(Args[0] == Variant))
148                    return op(data);
149                else
150                    return op(data.get!(Args));
151            }
152            else
153            {
154                import std.typecons : Tuple;
155                return op(data.get!(Tuple!(Args)).expand);
156            }
157        }
158    }
159
160    void checkops(T...)(T ops)
161    {
162        foreach (i, t1; T)
163        {
164            static assert(isFunctionPointer!t1 || isDelegate!t1);
165            alias a1 = Parameters!(t1);
166            alias r1 = ReturnType!(t1);
167
168            static if (i < T.length - 1 && is(r1 == void))
169            {
170                static assert(a1.length != 1 || !is(a1[0] == Variant),
171                              "function with arguments " ~ a1.stringof ~
172                              " occludes successive function");
173
174                foreach (t2; T[i + 1 .. $])
175                {
176                    static assert(isFunctionPointer!t2 || isDelegate!t2);
177                    alias a2 = Parameters!(t2);
178
179                    static assert(!is(a1 == a2),
180                        "function with arguments " ~ a1.stringof ~ " occludes successive function");
181                }
182            }
183        }
184    }
185
186    @property ref ThreadInfo thisInfo() nothrow
187    {
188        if (scheduler is null)
189            return ThreadInfo.thisInfo;
190        return scheduler.thisInfo;
191    }
192}
193
194static ~this()
195{
196    thisInfo.cleanup();
197}
198
199// Exceptions
200
201/**
202 * Thrown on calls to $(D receiveOnly) if a message other than the type
203 * the receiving thread expected is sent.
204 */
205class MessageMismatch : Exception
206{
207    ///
208    this(string msg = "Unexpected message type") @safe pure nothrow @nogc
209    {
210        super(msg);
211    }
212}
213
214/**
215 * Thrown on calls to $(D receive) if the thread that spawned the receiving
216 * thread has terminated and no more messages exist.
217 */
218class OwnerTerminated : Exception
219{
220    ///
221    this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc
222    {
223        super(msg);
224        tid = t;
225    }
226
227    Tid tid;
228}
229
230/**
231 * Thrown if a linked thread has terminated.
232 */
233class LinkTerminated : Exception
234{
235    ///
236    this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc
237    {
238        super(msg);
239        tid = t;
240    }
241
242    Tid tid;
243}
244
245/**
246 * Thrown if a message was sent to a thread via
247 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler
248 * for a message of this type.
249 */
250class PriorityMessageException : Exception
251{
252    ///
253    this(Variant vals)
254    {
255        super("Priority message");
256        message = vals;
257    }
258
259    /**
260     * The message that was sent.
261     */
262    Variant message;
263}
264
265/**
266 * Thrown on mailbox crowding if the mailbox is configured with
267 * $(D OnCrowding.throwException).
268 */
269class MailboxFull : Exception
270{
271    ///
272    this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc
273    {
274        super(msg);
275        tid = t;
276    }
277
278    Tid tid;
279}
280
281/**
282 * Thrown when a Tid is missing, e.g. when $(D ownerTid) doesn't
283 * find an owner thread.
284 */
285class TidMissingException : Exception
286{
287    import std.exception : basicExceptionCtors;
288    ///
289    mixin basicExceptionCtors;
290}
291
292
293// Thread ID
294
295
296/**
297 * An opaque type used to represent a logical thread.
298 */
299struct Tid
300{
301private:
302    this(MessageBox m) @safe pure nothrow @nogc
303    {
304        mbox = m;
305    }
306
307    MessageBox mbox;
308
309public:
310
311    /**
312     * Generate a convenient string for identifying this Tid.  This is only
313     * useful to see if Tid's that are currently executing are the same or
314     * different, e.g. for logging and debugging.  It is potentially possible
315     * that a Tid executed in the future will have the same toString() output
316     * as another Tid that has already terminated.
317     */
318    void toString(scope void delegate(const(char)[]) sink)
319    {
320        import std.format : formattedWrite;
321        formattedWrite(sink, "Tid(%x)", cast(void*) mbox);
322    }
323
324}
325
326@system unittest
327{
328    // text!Tid is @system
329    import std.conv : text;
330    Tid tid;
331    assert(text(tid) == "Tid(0)");
332    auto tid2 = thisTid;
333    assert(text(tid2) != "Tid(0)");
334    auto tid3 = tid2;
335    assert(text(tid2) == text(tid3));
336}
337
338/**
339 * Returns: The $(LREF Tid) of the caller's thread.
340 */
341@property Tid thisTid() @safe
342{
343    // TODO: remove when concurrency is safe
344    static auto trus() @trusted
345    {
346        if (thisInfo.ident != Tid.init)
347            return thisInfo.ident;
348        thisInfo.ident = Tid(new MessageBox);
349        return thisInfo.ident;
350    }
351
352    return trus();
353}
354
355/**
356 * Return the Tid of the thread which spawned the caller's thread.
357 *
358 * Throws: A $(D TidMissingException) exception if
359 * there is no owner thread.
360 */
361@property Tid ownerTid()
362{
363    import std.exception : enforce;
364
365    enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread.");
366    return thisInfo.owner;
367}
368
369@system unittest
370{
371    import std.exception : assertThrown;
372
373    static void fun()
374    {
375        string res = receiveOnly!string();
376        assert(res == "Main calling");
377        ownerTid.send("Child responding");
378    }
379
380    assertThrown!TidMissingException(ownerTid);
381    auto child = spawn(&fun);
382    child.send("Main calling");
383    string res = receiveOnly!string();
384    assert(res == "Child responding");
385}
386
387// Thread Creation
388
389private template isSpawnable(F, T...)
390{
391    template isParamsImplicitlyConvertible(F1, F2, int i = 0)
392    {
393        alias param1 = Parameters!F1;
394        alias param2 = Parameters!F2;
395        static if (param1.length != param2.length)
396            enum isParamsImplicitlyConvertible = false;
397        else static if (param1.length == i)
398            enum isParamsImplicitlyConvertible = true;
399        else static if (isImplicitlyConvertible!(param2[i], param1[i]))
400            enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1,
401                    F2, i + 1);
402        else
403            enum isParamsImplicitlyConvertible = false;
404    }
405
406    enum isSpawnable = isCallable!F && is(ReturnType!F == void)
407            && isParamsImplicitlyConvertible!(F, void function(T))
408            && (isFunctionPointer!F || !hasUnsharedAliasing!F);
409}
410
411/**
412 * Starts fn(args) in a new logical thread.
413 *
414 * Executes the supplied function in a new logical thread represented by
415 * $(D Tid).  The calling thread is designated as the owner of the new thread.
416 * When the owner thread terminates an $(D OwnerTerminated) message will be
417 * sent to the new thread, causing an $(D OwnerTerminated) exception to be
418 * thrown on $(D receive()).
419 *
420 * Params:
421 *  fn   = The function to execute.
422 *  args = Arguments to the function.
423 *
424 * Returns:
425 *  A Tid representing the new logical thread.
426 *
427 * Notes:
428 *  $(D args) must not have unshared aliasing.  In other words, all arguments
429 *  to $(D fn) must either be $(D shared) or $(D immutable) or have no
430 *  pointer indirection.  This is necessary for enforcing isolation among
431 *  threads.
432 *
433 * Example:
434 * ---
435 * import std.stdio, std.concurrency;
436 *
437 * void f1(string str)
438 * {
439 *     writeln(str);
440 * }
441 *
442 * void f2(char[] str)
443 * {
444 *     writeln(str);
445 * }
446 *
447 * void main()
448 * {
449 *     auto str = "Hello, world";
450 *
451 *     // Works:  string is immutable.
452 *     auto tid1 = spawn(&f1, str);
453 *
454 *     // Fails:  char[] has mutable aliasing.
455 *     auto tid2 = spawn(&f2, str.dup);
456 *
457 *     // New thread with anonymous function
458 *     spawn({ writeln("This is so great!"); });
459 * }
460 * ---
461 */
462Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T))
463{
464    static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
465    return _spawn(false, fn, args);
466}
467
468/**
469 * Starts fn(args) in a logical thread and will receive a LinkTerminated
470 * message when the operation terminates.
471 *
472 * Executes the supplied function in a new logical thread represented by
473 * Tid.  This new thread is linked to the calling thread so that if either
474 * it or the calling thread terminates a LinkTerminated message will be sent
475 * to the other, causing a LinkTerminated exception to be thrown on receive().
476 * The owner relationship from spawn() is preserved as well, so if the link
477 * between threads is broken, owner termination will still result in an
478 * OwnerTerminated exception to be thrown on receive().
479 *
480 * Params:
481 *  fn   = The function to execute.
482 *  args = Arguments to the function.
483 *
484 * Returns:
485 *  A Tid representing the new thread.
486 */
487Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T))
488{
489    static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
490    return _spawn(true, fn, args);
491}
492
493/*
494 *
495 */
496private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T))
497{
498    // TODO: MessageList and &exec should be shared.
499    auto spawnTid = Tid(new MessageBox);
500    auto ownerTid = thisTid;
501
502    void exec()
503    {
504        thisInfo.ident = spawnTid;
505        thisInfo.owner = ownerTid;
506        fn(args);
507    }
508
509    // TODO: MessageList and &exec should be shared.
510    if (scheduler !is null)
511        scheduler.spawn(&exec);
512    else
513    {
514        auto t = new Thread(&exec);
515        t.start();
516    }
517    thisInfo.links[spawnTid] = linked;
518    return spawnTid;
519}
520
521@system unittest
522{
523    void function() fn1;
524    void function(int) fn2;
525    static assert(__traits(compiles, spawn(fn1)));
526    static assert(__traits(compiles, spawn(fn2, 2)));
527    static assert(!__traits(compiles, spawn(fn1, 1)));
528    static assert(!__traits(compiles, spawn(fn2)));
529
530    void delegate(int) shared dg1;
531    shared(void delegate(int)) dg2;
532    shared(void delegate(long) shared) dg3;
533    shared(void delegate(real, int, long) shared) dg4;
534    void delegate(int) immutable dg5;
535    void delegate(int) dg6;
536    static assert(__traits(compiles, spawn(dg1, 1)));
537    static assert(__traits(compiles, spawn(dg2, 2)));
538    static assert(__traits(compiles, spawn(dg3, 3)));
539    static assert(__traits(compiles, spawn(dg4, 4, 4, 4)));
540    static assert(__traits(compiles, spawn(dg5, 5)));
541    static assert(!__traits(compiles, spawn(dg6, 6)));
542
543    auto callable1  = new class{ void opCall(int) shared {} };
544    auto callable2  = cast(shared) new class{ void opCall(int) shared {} };
545    auto callable3  = new class{ void opCall(int) immutable {} };
546    auto callable4  = cast(immutable) new class{ void opCall(int) immutable {} };
547    auto callable5  = new class{ void opCall(int) {} };
548    auto callable6  = cast(shared) new class{ void opCall(int) immutable {} };
549    auto callable7  = cast(immutable) new class{ void opCall(int) shared {} };
550    auto callable8  = cast(shared) new class{ void opCall(int) const shared {} };
551    auto callable9  = cast(const shared) new class{ void opCall(int) shared {} };
552    auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} };
553    auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} };
554    static assert(!__traits(compiles, spawn(callable1,  1)));
555    static assert( __traits(compiles, spawn(callable2,  2)));
556    static assert(!__traits(compiles, spawn(callable3,  3)));
557    static assert( __traits(compiles, spawn(callable4,  4)));
558    static assert(!__traits(compiles, spawn(callable5,  5)));
559    static assert(!__traits(compiles, spawn(callable6,  6)));
560    static assert(!__traits(compiles, spawn(callable7,  7)));
561    static assert( __traits(compiles, spawn(callable8,  8)));
562    static assert(!__traits(compiles, spawn(callable9,  9)));
563    static assert( __traits(compiles, spawn(callable10, 10)));
564    static assert( __traits(compiles, spawn(callable11, 11)));
565}
566
567/**
568 * Places the values as a message at the back of tid's message queue.
569 *
570 * Sends the supplied value to the thread represented by tid.  As with
571 * $(REF spawn, std,concurrency), $(D T) must not have unshared aliasing.
572 */
573void send(T...)(Tid tid, T vals)
574{
575    static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
576    _send(tid, vals);
577}
578
579/**
580 * Places the values as a message on the front of tid's message queue.
581 *
582 * Send a message to $(D tid) but place it at the front of $(D tid)'s message
583 * queue instead of at the back.  This function is typically used for
584 * out-of-band communication, to signal exceptional conditions, etc.
585 */
586void prioritySend(T...)(Tid tid, T vals)
587{
588    static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed.");
589    _send(MsgType.priority, tid, vals);
590}
591
592/*
593 * ditto
594 */
595private void _send(T...)(Tid tid, T vals)
596{
597    _send(MsgType.standard, tid, vals);
598}
599
600/*
601 * Implementation of send.  This allows parameter checking to be different for
602 * both Tid.send() and .send().
603 */
604private void _send(T...)(MsgType type, Tid tid, T vals)
605{
606    auto msg = Message(type, vals);
607    tid.mbox.put(msg);
608}
609
610/**
611 * Receives a message from another thread.
612 *
613 * Receive a message from another thread, or block if no messages of the
614 * specified types are available.  This function works by pattern matching
615 * a message against a set of delegates and executing the first match found.
616 *
617 * If a delegate that accepts a $(REF Variant, std,variant) is included as
618 * the last argument to $(D receive), it will match any message that was not
619 * matched by an earlier delegate.  If more than one argument is sent,
620 * the $(D Variant) will contain a $(REF Tuple, std,typecons) of all values
621 * sent.
622 *
623 * Example:
624 * ---
625 * import std.stdio;
626 * import std.variant;
627 * import std.concurrency;
628 *
629 * void spawnedFunction()
630 * {
631 *     receive(
632 *         (int i) { writeln("Received an int."); },
633 *         (float f) { writeln("Received a float."); },
634 *         (Variant v) { writeln("Received some other type."); }
635 *     );
636 * }
637 *
638 * void main()
639 * {
640 *      auto tid = spawn(&spawnedFunction);
641 *      send(tid, 42);
642 * }
643 * ---
644 */
645void receive(T...)( T ops )
646in
647{
648    assert(thisInfo.ident.mbox !is null,
649           "Cannot receive a message until a thread was spawned "
650           ~ "or thisTid was passed to a running thread.");
651}
652body
653{
654    checkops( ops );
655
656    thisInfo.ident.mbox.get( ops );
657}
658
659
660@safe unittest
661{
662    static assert( __traits( compiles,
663                      {
664                          receive( (Variant x) {} );
665                          receive( (int x) {}, (Variant x) {} );
666                      } ) );
667
668    static assert( !__traits( compiles,
669                       {
670                           receive( (Variant x) {}, (int x) {} );
671                       } ) );
672
673    static assert( !__traits( compiles,
674                       {
675                           receive( (int x) {}, (int x) {} );
676                       } ) );
677}
678
679// Make sure receive() works with free functions as well.
680version (unittest)
681{
682    private void receiveFunction(int x) {}
683}
684@safe unittest
685{
686    static assert( __traits( compiles,
687                      {
688                          receive( &receiveFunction );
689                          receive( &receiveFunction, (Variant x) {} );
690                      } ) );
691}
692
693
694private template receiveOnlyRet(T...)
695{
696    static if ( T.length == 1 )
697    {
698        alias receiveOnlyRet = T[0];
699    }
700    else
701    {
702        import std.typecons : Tuple;
703        alias receiveOnlyRet = Tuple!(T);
704    }
705}
706
707/**
708 * Receives only messages with arguments of types $(D T).
709 *
710 * Throws:  $(D MessageMismatch) if a message of types other than $(D T)
711 *          is received.
712 *
713 * Returns: The received message.  If $(D T.length) is greater than one,
714 *          the message will be packed into a $(REF Tuple, std,typecons).
715 *
716 * Example:
717 * ---
718 * import std.concurrency;
719 *
720 * void spawnedFunc()
721 * {
722 *     auto msg = receiveOnly!(int, string)();
723 *     assert(msg[0] == 42);
724 *     assert(msg[1] == "42");
725 * }
726 *
727 * void main()
728 * {
729 *     auto tid = spawn(&spawnedFunc);
730 *     send(tid, 42, "42");
731 * }
732 * ---
733 */
734receiveOnlyRet!(T) receiveOnly(T...)()
735in
736{
737    assert(thisInfo.ident.mbox !is null,
738        "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
739}
740body
741{
742    import std.format : format;
743    import std.typecons : Tuple;
744
745    Tuple!(T) ret;
746
747    thisInfo.ident.mbox.get((T val) {
748        static if (T.length)
749            ret.field = val;
750    },
751    (LinkTerminated e) { throw e; },
752    (OwnerTerminated e) { throw e; },
753    (Variant val) {
754        static if (T.length > 1)
755            string exp = T.stringof;
756        else
757            string exp = T[0].stringof;
758
759        throw new MessageMismatch(
760            format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString()));
761    });
762    static if (T.length == 1)
763        return ret[0];
764    else
765        return ret;
766}
767
768@system unittest
769{
770    static void t1(Tid mainTid)
771    {
772        try
773        {
774            receiveOnly!string();
775            mainTid.send("");
776        }
777        catch (Throwable th)
778        {
779            mainTid.send(th.msg);
780        }
781    }
782
783    auto tid = spawn(&t1, thisTid);
784    tid.send(1);
785    string result = receiveOnly!string();
786    assert(result == "Unexpected message type: expected 'string', got 'int'");
787}
788
789/**
790 * Tries to receive but will give up if no matches arrive within duration.
791 * Won't wait at all if provided $(REF Duration, core,time) is negative.
792 *
793 * Same as $(D receive) except that rather than wait forever for a message,
794 * it waits until either it receives a message or the given
795 * $(REF Duration, core,time) has passed. It returns $(D true) if it received a
796 * message and $(D false) if it timed out waiting for one.
797 */
798bool receiveTimeout(T...)(Duration duration, T ops)
799in
800{
801    assert(thisInfo.ident.mbox !is null,
802        "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread.");
803}
804body
805{
806    checkops(ops);
807
808    return thisInfo.ident.mbox.get(duration, ops);
809}
810
811@safe unittest
812{
813    static assert(__traits(compiles, {
814        receiveTimeout(msecs(0), (Variant x) {});
815        receiveTimeout(msecs(0), (int x) {}, (Variant x) {});
816    }));
817
818    static assert(!__traits(compiles, {
819        receiveTimeout(msecs(0), (Variant x) {}, (int x) {});
820    }));
821
822    static assert(!__traits(compiles, {
823        receiveTimeout(msecs(0), (int x) {}, (int x) {});
824    }));
825
826    static assert(__traits(compiles, {
827        receiveTimeout(msecs(10), (int x) {}, (Variant x) {});
828    }));
829}
830
831// MessageBox Limits
832
833/**
834 * These behaviors may be specified when a mailbox is full.
835 */
836enum OnCrowding
837{
838    block, /// Wait until room is available.
839    throwException, /// Throw a MailboxFull exception.
840    ignore /// Abort the send and return.
841}
842
843private
844{
845    bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc
846    {
847        return true;
848    }
849
850    bool onCrowdingThrow(Tid tid) @safe pure
851    {
852        throw new MailboxFull(tid);
853    }
854
855    bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc
856    {
857        return false;
858    }
859}
860
861/**
862 * Sets a maximum mailbox size.
863 *
864 * Sets a limit on the maximum number of user messages allowed in the mailbox.
865 * If this limit is reached, the caller attempting to add a new message will
866 * execute the behavior specified by doThis.  If messages is zero, the mailbox
867 * is unbounded.
868 *
869 * Params:
870 *  tid      = The Tid of the thread for which this limit should be set.
871 *  messages = The maximum number of messages or zero if no limit.
872 *  doThis   = The behavior executed when a message is sent to a full
873 *             mailbox.
874 */
875void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure
876{
877    final switch (doThis)
878    {
879    case OnCrowding.block:
880        return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock);
881    case OnCrowding.throwException:
882        return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow);
883    case OnCrowding.ignore:
884        return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore);
885    }
886}
887
888/**
889 * Sets a maximum mailbox size.
890 *
891 * Sets a limit on the maximum number of user messages allowed in the mailbox.
892 * If this limit is reached, the caller attempting to add a new message will
893 * execute onCrowdingDoThis.  If messages is zero, the mailbox is unbounded.
894 *
895 * Params:
896 *  tid      = The Tid of the thread for which this limit should be set.
897 *  messages = The maximum number of messages or zero if no limit.
898 *  onCrowdingDoThis = The routine called when a message is sent to a full
899 *                     mailbox.
900 */
901void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis)
902{
903    tid.mbox.setMaxMsgs(messages, onCrowdingDoThis);
904}
905
906private
907{
908    __gshared Tid[string] tidByName;
909    __gshared string[][Tid] namesByTid;
910}
911
912private @property Mutex registryLock()
913{
914    __gshared Mutex impl;
915    initOnce!impl(new Mutex);
916    return impl;
917}
918
919private void unregisterMe()
920{
921    auto me = thisInfo.ident;
922    if (thisInfo.ident != Tid.init)
923    {
924        synchronized (registryLock)
925        {
926            if (auto allNames = me in namesByTid)
927            {
928                foreach (name; *allNames)
929                    tidByName.remove(name);
930                namesByTid.remove(me);
931            }
932        }
933    }
934}
935
936/**
937 * Associates name with tid.
938 *
939 * Associates name with tid in a process-local map.  When the thread
940 * represented by tid terminates, any names associated with it will be
941 * automatically unregistered.
942 *
943 * Params:
944 *  name = The name to associate with tid.
945 *  tid  = The tid register by name.
946 *
947 * Returns:
948 *  true if the name is available and tid is not known to represent a
949 *  defunct thread.
950 */
951bool register(string name, Tid tid)
952{
953    synchronized (registryLock)
954    {
955        if (name in tidByName)
956            return false;
957        if (tid.mbox.isClosed)
958            return false;
959        namesByTid[tid] ~= name;
960        tidByName[name] = tid;
961        return true;
962    }
963}
964
965/**
966 * Removes the registered name associated with a tid.
967 *
968 * Params:
969 *  name = The name to unregister.
970 *
971 * Returns:
972 *  true if the name is registered, false if not.
973 */
974bool unregister(string name)
975{
976    import std.algorithm.mutation : remove, SwapStrategy;
977    import std.algorithm.searching : countUntil;
978
979    synchronized (registryLock)
980    {
981        if (auto tid = name in tidByName)
982        {
983            auto allNames = *tid in namesByTid;
984            auto pos = countUntil(*allNames, name);
985            remove!(SwapStrategy.unstable)(*allNames, pos);
986            tidByName.remove(name);
987            return true;
988        }
989        return false;
990    }
991}
992
993/**
994 * Gets the Tid associated with name.
995 *
996 * Params:
997 *  name = The name to locate within the registry.
998 *
999 * Returns:
1000 *  The associated Tid or Tid.init if name is not registered.
1001 */
1002Tid locate(string name)
1003{
1004    synchronized (registryLock)
1005    {
1006        if (auto tid = name in tidByName)
1007            return *tid;
1008        return Tid.init;
1009    }
1010}
1011
1012/**
1013 * Encapsulates all implementation-level data needed for scheduling.
1014 *
1015 * When defining a Scheduler, an instance of this struct must be associated
1016 * with each logical thread.  It contains all implementation-level information
1017 * needed by the internal API.
1018 */
1019struct ThreadInfo
1020{
1021    Tid ident;
1022    bool[Tid] links;
1023    Tid owner;
1024
1025    /**
1026     * Gets a thread-local instance of ThreadInfo.
1027     *
1028     * Gets a thread-local instance of ThreadInfo, which should be used as the
1029     * default instance when info is requested for a thread not created by the
1030     * Scheduler.
1031     */
1032    static @property ref thisInfo() nothrow
1033    {
1034        static ThreadInfo val;
1035        return val;
1036    }
1037
1038    /**
1039     * Cleans up this ThreadInfo.
1040     *
1041     * This must be called when a scheduled thread terminates.  It tears down
1042     * the messaging system for the thread and notifies interested parties of
1043     * the thread's termination.
1044     */
1045    void cleanup()
1046    {
1047        if (ident.mbox !is null)
1048            ident.mbox.close();
1049        foreach (tid; links.keys)
1050            _send(MsgType.linkDead, tid, ident);
1051        if (owner != Tid.init)
1052            _send(MsgType.linkDead, owner, ident);
1053        unregisterMe(); // clean up registry entries
1054    }
1055}
1056
1057/**
1058 * A Scheduler controls how threading is performed by spawn.
1059 *
1060 * Implementing a Scheduler allows the concurrency mechanism used by this
1061 * module to be customized according to different needs.  By default, a call
1062 * to spawn will create a new kernel thread that executes the supplied routine
1063 * and terminates when finished.  But it is possible to create Schedulers that
1064 * reuse threads, that multiplex Fibers (coroutines) across a single thread,
1065 * or any number of other approaches.  By making the choice of Scheduler a
1066 * user-level option, std.concurrency may be used for far more types of
1067 * application than if this behavior were predefined.
1068 *
1069 * Example:
1070 * ---
1071 * import std.concurrency;
1072 * import std.stdio;
1073 *
1074 * void main()
1075 * {
1076 *     scheduler = new FiberScheduler;
1077 *     scheduler.start(
1078 *     {
1079 *         writeln("the rest of main goes here");
1080 *     });
1081 * }
1082 * ---
1083 *
1084 * Some schedulers have a dispatching loop that must run if they are to work
1085 * properly, so for the sake of consistency, when using a scheduler, start()
1086 * must be called within main().  This yields control to the scheduler and
1087 * will ensure that any spawned threads are executed in an expected manner.
1088 */
1089interface Scheduler
1090{
1091    /**
1092     * Spawns the supplied op and starts the Scheduler.
1093     *
1094     * This is intended to be called at the start of the program to yield all
1095     * scheduling to the active Scheduler instance.  This is necessary for
1096     * schedulers that explicitly dispatch threads rather than simply relying
1097     * on the operating system to do so, and so start should always be called
1098     * within main() to begin normal program execution.
1099     *
1100     * Params:
1101     *  op = A wrapper for whatever the main thread would have done in the
1102     *       absence of a custom scheduler.  It will be automatically executed
1103     *       via a call to spawn by the Scheduler.
1104     */
1105    void start(void delegate() op);
1106
1107    /**
1108     * Assigns a logical thread to execute the supplied op.
1109     *
1110     * This routine is called by spawn.  It is expected to instantiate a new
1111     * logical thread and run the supplied operation.  This thread must call
1112     * thisInfo.cleanup() when the thread terminates if the scheduled thread
1113     * is not a kernel thread--all kernel threads will have their ThreadInfo
1114     * cleaned up automatically by a thread-local destructor.
1115     *
1116     * Params:
1117     *  op = The function to execute.  This may be the actual function passed
1118     *       by the user to spawn itself, or may be a wrapper function.
1119     */
1120    void spawn(void delegate() op);
1121
1122    /**
1123     * Yields execution to another logical thread.
1124     *
1125     * This routine is called at various points within concurrency-aware APIs
1126     * to provide a scheduler a chance to yield execution when using some sort
1127     * of cooperative multithreading model.  If this is not appropriate, such
1128     * as when each logical thread is backed by a dedicated kernel thread,
1129     * this routine may be a no-op.
1130     */
1131    void yield() nothrow;
1132
1133    /**
1134     * Returns an appropriate ThreadInfo instance.
1135     *
1136     * Returns an instance of ThreadInfo specific to the logical thread that
1137     * is calling this routine or, if the calling thread was not create by
1138     * this scheduler, returns ThreadInfo.thisInfo instead.
1139     */
1140    @property ref ThreadInfo thisInfo() nothrow;
1141
1142    /**
1143     * Creates a Condition variable analog for signaling.
1144     *
1145     * Creates a new Condition variable analog which is used to check for and
1146     * to signal the addition of messages to a thread's message queue.  Like
1147     * yield, some schedulers may need to define custom behavior so that calls
1148     * to Condition.wait() yield to another thread when no new messages are
1149     * available instead of blocking.
1150     *
1151     * Params:
1152     *  m = The Mutex that will be associated with this condition.  It will be
1153     *      locked prior to any operation on the condition, and so in some
1154     *      cases a Scheduler may need to hold this reference and unlock the
1155     *      mutex before yielding execution to another logical thread.
1156     */
1157    Condition newCondition(Mutex m) nothrow;
1158}
1159
1160/**
1161 * An example Scheduler using kernel threads.
1162 *
1163 * This is an example Scheduler that mirrors the default scheduling behavior
1164 * of creating one kernel thread per call to spawn.  It is fully functional
1165 * and may be instantiated and used, but is not a necessary part of the
1166 * default functioning of this module.
1167 */
1168class ThreadScheduler : Scheduler
1169{
1170    /**
1171     * This simply runs op directly, since no real scheduling is needed by
1172     * this approach.
1173     */
1174    void start(void delegate() op)
1175    {
1176        op();
1177    }
1178
1179    /**
1180     * Creates a new kernel thread and assigns it to run the supplied op.
1181     */
1182    void spawn(void delegate() op)
1183    {
1184        auto t = new Thread(op);
1185        t.start();
1186    }
1187
1188    /**
1189     * This scheduler does no explicit multiplexing, so this is a no-op.
1190     */
1191    void yield() nothrow
1192    {
1193        // no explicit yield needed
1194    }
1195
1196    /**
1197     * Returns ThreadInfo.thisInfo, since it is a thread-local instance of
1198     * ThreadInfo, which is the correct behavior for this scheduler.
1199     */
1200    @property ref ThreadInfo thisInfo() nothrow
1201    {
1202        return ThreadInfo.thisInfo;
1203    }
1204
1205    /**
1206     * Creates a new Condition variable.  No custom behavior is needed here.
1207     */
1208    Condition newCondition(Mutex m) nothrow
1209    {
1210        return new Condition(m);
1211    }
1212}
1213
1214/**
1215 * An example Scheduler using Fibers.
1216 *
1217 * This is an example scheduler that creates a new Fiber per call to spawn
1218 * and multiplexes the execution of all fibers within the main thread.
1219 */
1220class FiberScheduler : Scheduler
1221{
1222    /**
1223     * This creates a new Fiber for the supplied op and then starts the
1224     * dispatcher.
1225     */
1226    void start(void delegate() op)
1227    {
1228        create(op);
1229        dispatch();
1230    }
1231
1232    /**
1233     * This created a new Fiber for the supplied op and adds it to the
1234     * dispatch list.
1235     */
1236    void spawn(void delegate() op) nothrow
1237    {
1238        create(op);
1239        yield();
1240    }
1241
1242    /**
1243     * If the caller is a scheduled Fiber, this yields execution to another
1244     * scheduled Fiber.
1245     */
1246    void yield() nothrow
1247    {
1248        // NOTE: It's possible that we should test whether the calling Fiber
1249        //       is an InfoFiber before yielding, but I think it's reasonable
1250        //       that any (non-Generator) fiber should yield here.
1251        if (Fiber.getThis())
1252            Fiber.yield();
1253    }
1254
1255    /**
1256     * Returns an appropriate ThreadInfo instance.
1257     *
1258     * Returns a ThreadInfo instance specific to the calling Fiber if the
1259     * Fiber was created by this dispatcher, otherwise it returns
1260     * ThreadInfo.thisInfo.
1261     */
1262    @property ref ThreadInfo thisInfo() nothrow
1263    {
1264        auto f = cast(InfoFiber) Fiber.getThis();
1265
1266        if (f !is null)
1267            return f.info;
1268        return ThreadInfo.thisInfo;
1269    }
1270
1271    /**
1272     * Returns a Condition analog that yields when wait or notify is called.
1273     */
1274    Condition newCondition(Mutex m) nothrow
1275    {
1276        return new FiberCondition(m);
1277    }
1278
1279private:
1280    static class InfoFiber : Fiber
1281    {
1282        ThreadInfo info;
1283
1284        this(void delegate() op) nothrow
1285        {
1286            super(op);
1287        }
1288    }
1289
1290    class FiberCondition : Condition
1291    {
1292        this(Mutex m) nothrow
1293        {
1294            super(m);
1295            notified = false;
1296        }
1297
1298        override void wait() nothrow
1299        {
1300            scope (exit) notified = false;
1301
1302            while (!notified)
1303                switchContext();
1304        }
1305
1306        override bool wait(Duration period) nothrow
1307        {
1308            import core.time : MonoTime;
1309
1310            scope (exit) notified = false;
1311
1312            for (auto limit = MonoTime.currTime + period;
1313                 !notified && !period.isNegative;
1314                 period = limit - MonoTime.currTime)
1315            {
1316                yield();
1317            }
1318            return notified;
1319        }
1320
1321        override void notify() nothrow
1322        {
1323            notified = true;
1324            switchContext();
1325        }
1326
1327        override void notifyAll() nothrow
1328        {
1329            notified = true;
1330            switchContext();
1331        }
1332
1333    private:
1334        void switchContext() nothrow
1335        {
1336            mutex_nothrow.unlock_nothrow();
1337            scope (exit) mutex_nothrow.lock_nothrow();
1338            yield();
1339        }
1340
1341        private bool notified;
1342    }
1343
1344private:
1345    void dispatch()
1346    {
1347        import std.algorithm.mutation : remove;
1348
1349        while (m_fibers.length > 0)
1350        {
1351            auto t = m_fibers[m_pos].call(Fiber.Rethrow.no);
1352            if (t !is null && !(cast(OwnerTerminated) t))
1353            {
1354                throw t;
1355            }
1356            if (m_fibers[m_pos].state == Fiber.State.TERM)
1357            {
1358                if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length)
1359                    m_pos = 0;
1360            }
1361            else if (m_pos++ >= m_fibers.length - 1)
1362            {
1363                m_pos = 0;
1364            }
1365        }
1366    }
1367
1368    void create(void delegate() op) nothrow
1369    {
1370        void wrap()
1371        {
1372            scope (exit)
1373            {
1374                thisInfo.cleanup();
1375            }
1376            op();
1377        }
1378
1379        m_fibers ~= new InfoFiber(&wrap);
1380    }
1381
1382private:
1383    Fiber[] m_fibers;
1384    size_t m_pos;
1385}
1386
1387@system unittest
1388{
1389    static void receive(Condition cond, ref size_t received)
1390    {
1391        while (true)
1392        {
1393            synchronized (cond.mutex)
1394            {
1395                cond.wait();
1396                ++received;
1397            }
1398        }
1399    }
1400
1401    static void send(Condition cond, ref size_t sent)
1402    {
1403        while (true)
1404        {
1405            synchronized (cond.mutex)
1406            {
1407                ++sent;
1408                cond.notify();
1409            }
1410        }
1411    }
1412
1413    auto fs = new FiberScheduler;
1414    auto mtx = new Mutex;
1415    auto cond = fs.newCondition(mtx);
1416
1417    size_t received, sent;
1418    auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); });
1419    waiter.call();
1420    assert(received == 0);
1421    notifier.call();
1422    assert(sent == 1);
1423    assert(received == 0);
1424    waiter.call();
1425    assert(received == 1);
1426    waiter.call();
1427    assert(received == 1);
1428}
1429
1430/**
1431 * Sets the Scheduler behavior within the program.
1432 *
1433 * This variable sets the Scheduler behavior within this program.  Typically,
1434 * when setting a Scheduler, scheduler.start() should be called in main.  This
1435 * routine will not return until program execution is complete.
1436 */
1437__gshared Scheduler scheduler;
1438
1439// Generator
1440
1441/**
1442 * If the caller is a Fiber and is not a Generator, this function will call
1443 * scheduler.yield() or Fiber.yield(), as appropriate.
1444 */
1445void yield() nothrow
1446{
1447    auto fiber = Fiber.getThis();
1448    if (!(cast(IsGenerator) fiber))
1449    {
1450        if (scheduler is null)
1451        {
1452            if (fiber)
1453                return Fiber.yield();
1454        }
1455        else
1456            scheduler.yield();
1457    }
1458}
1459
1460/// Used to determine whether a Generator is running.
1461private interface IsGenerator {}
1462
1463
1464/**
1465 * A Generator is a Fiber that periodically returns values of type T to the
1466 * caller via yield.  This is represented as an InputRange.
1467 *
1468 * Example:
1469 * ---
1470 * import std.concurrency;
1471 * import std.stdio;
1472 *
1473 *
1474 * void main()
1475 * {
1476 *     auto tid = spawn(
1477 *     {
1478 *         while (true)
1479 *         {
1480 *             writeln(receiveOnly!int());
1481 *         }
1482 *     });
1483 *
1484 *     auto r = new Generator!int(
1485 *     {
1486 *         foreach (i; 1 .. 10)
1487 *             yield(i);
1488 *     });
1489 *
1490 *     foreach (e; r)
1491 *     {
1492 *         tid.send(e);
1493 *     }
1494 * }
1495 * ---
1496 */
1497class Generator(T) :
1498    Fiber, IsGenerator, InputRange!T
1499{
1500    /**
1501     * Initializes a generator object which is associated with a static
1502     * D function.  The function will be called once to prepare the range
1503     * for iteration.
1504     *
1505     * Params:
1506     *  fn = The fiber function.
1507     *
1508     * In:
1509     *  fn must not be null.
1510     */
1511    this(void function() fn)
1512    {
1513        super(fn);
1514        call();
1515    }
1516
1517    /**
1518     * Initializes a generator object which is associated with a static
1519     * D function.  The function will be called once to prepare the range
1520     * for iteration.
1521     *
1522     * Params:
1523     *  fn = The fiber function.
1524     *  sz = The stack size for this fiber.
1525     *
1526     * In:
1527     *  fn must not be null.
1528     */
1529    this(void function() fn, size_t sz)
1530    {
1531        super(fn, sz);
1532        call();
1533    }
1534
1535    /**
1536     * Initializes a generator object which is associated with a dynamic
1537     * D function.  The function will be called once to prepare the range
1538     * for iteration.
1539     *
1540     * Params:
1541     *  dg = The fiber function.
1542     *
1543     * In:
1544     *  dg must not be null.
1545     */
1546    this(void delegate() dg)
1547    {
1548        super(dg);
1549        call();
1550    }
1551
1552    /**
1553     * Initializes a generator object which is associated with a dynamic
1554     * D function.  The function will be called once to prepare the range
1555     * for iteration.
1556     *
1557     * Params:
1558     *  dg = The fiber function.
1559     *  sz = The stack size for this fiber.
1560     *
1561     * In:
1562     *  dg must not be null.
1563     */
1564    this(void delegate() dg, size_t sz)
1565    {
1566        super(dg, sz);
1567        call();
1568    }
1569
1570    /**
1571     * Returns true if the generator is empty.
1572     */
1573    final bool empty() @property
1574    {
1575        return m_value is null || state == State.TERM;
1576    }
1577
1578    /**
1579     * Obtains the next value from the underlying function.
1580     */
1581    final void popFront()
1582    {
1583        call();
1584    }
1585
1586    /**
1587     * Returns the most recently generated value by shallow copy.
1588     */
1589    final T front() @property
1590    {
1591        return *m_value;
1592    }
1593
1594    /**
1595     * Returns the most recently generated value without executing a
1596     * copy contructor. Will not compile for element types defining a
1597     * postblit, because Generator does not return by reference.
1598     */
1599    final T moveFront()
1600    {
1601        static if (!hasElaborateCopyConstructor!T)
1602        {
1603            return front;
1604        }
1605        else
1606        {
1607            static assert(0,
1608                    "Fiber front is always rvalue and thus cannot be moved since it defines a postblit.");
1609        }
1610    }
1611
1612    final int opApply(scope int delegate(T) loopBody)
1613    {
1614        int broken;
1615        for (; !empty; popFront())
1616        {
1617            broken = loopBody(front);
1618            if (broken) break;
1619        }
1620        return broken;
1621    }
1622
1623    final int opApply(scope int delegate(size_t, T) loopBody)
1624    {
1625        int broken;
1626        for (size_t i; !empty; ++i, popFront())
1627        {
1628            broken = loopBody(i, front);
1629            if (broken) break;
1630        }
1631        return broken;
1632    }
1633private:
1634    T* m_value;
1635}
1636
1637/**
1638 * Yields a value of type T to the caller of the currently executing
1639 * generator.
1640 *
1641 * Params:
1642 *  value = The value to yield.
1643 */
1644void yield(T)(ref T value)
1645{
1646    Generator!T cur = cast(Generator!T) Fiber.getThis();
1647    if (cur !is null && cur.state == Fiber.State.EXEC)
1648    {
1649        cur.m_value = &value;
1650        return Fiber.yield();
1651    }
1652    throw new Exception("yield(T) called with no active generator for the supplied type");
1653}
1654
1655/// ditto
1656void yield(T)(T value)
1657{
1658    yield(value);
1659}
1660
1661@system unittest
1662{
1663    import core.exception;
1664    import std.exception;
1665
1666    static void testScheduler(Scheduler s)
1667    {
1668        scheduler = s;
1669        scheduler.start({
1670            auto tid = spawn({
1671                int i;
1672
1673                try
1674                {
1675                    for (i = 1; i < 10; i++)
1676                    {
1677                        assertNotThrown!AssertError(assert(receiveOnly!int() == i));
1678                    }
1679                }
1680                catch (OwnerTerminated e)
1681                {
1682
1683                }
1684
1685                // i will advance 1 past the last value expected
1686                assert(i == 4);
1687            });
1688
1689            auto r = new Generator!int({
1690                assertThrown!Exception(yield(2.0));
1691                yield(); // ensure this is a no-op
1692                yield(1);
1693                yield(); // also once something has been yielded
1694                yield(2);
1695                yield(3);
1696            });
1697
1698            foreach (e; r)
1699            {
1700                tid.send(e);
1701            }
1702        });
1703        scheduler = null;
1704    }
1705
1706    testScheduler(new ThreadScheduler);
1707    testScheduler(new FiberScheduler);
1708}
1709///
1710@system unittest
1711{
1712    import std.range;
1713
1714    InputRange!int myIota = iota(10).inputRangeObject;
1715
1716    myIota.popFront();
1717    myIota.popFront();
1718    assert(myIota.moveFront == 2);
1719    assert(myIota.front == 2);
1720    myIota.popFront();
1721    assert(myIota.front == 3);
1722
1723    //can be assigned to std.range.interfaces.InputRange directly
1724    myIota = new Generator!int(
1725    {
1726        foreach (i; 0 .. 10) yield(i);
1727    });
1728
1729    myIota.popFront();
1730    myIota.popFront();
1731    assert(myIota.moveFront == 2);
1732    assert(myIota.front == 2);
1733    myIota.popFront();
1734    assert(myIota.front == 3);
1735
1736    size_t[2] counter = [0, 0];
1737    foreach (i, unused; myIota) counter[] += [1, i];
1738
1739    assert(myIota.empty);
1740    assert(counter == [7, 21]);
1741}
1742
1743private
1744{
1745    /*
1746     * A MessageBox is a message queue for one thread.  Other threads may send
1747     * messages to this owner by calling put(), and the owner receives them by
1748     * calling get().  The put() call is therefore effectively shared and the
1749     * get() call is effectively local.  setMaxMsgs may be used by any thread
1750     * to limit the size of the message queue.
1751     */
1752    class MessageBox
1753    {
1754        this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */
1755        {
1756            m_lock = new Mutex;
1757            m_closed = false;
1758
1759            if (scheduler is null)
1760            {
1761                m_putMsg = new Condition(m_lock);
1762                m_notFull = new Condition(m_lock);
1763            }
1764            else
1765            {
1766                m_putMsg = scheduler.newCondition(m_lock);
1767                m_notFull = scheduler.newCondition(m_lock);
1768            }
1769        }
1770
1771        ///
1772        final @property bool isClosed() @safe @nogc pure
1773        {
1774            synchronized (m_lock)
1775            {
1776                return m_closed;
1777            }
1778        }
1779
1780        /*
1781         * Sets a limit on the maximum number of user messages allowed in the
1782         * mailbox.  If this limit is reached, the caller attempting to add
1783         * a new message will execute call.  If num is zero, there is no limit
1784         * on the message queue.
1785         *
1786         * Params:
1787         *  num  = The maximum size of the queue or zero if the queue is
1788         *         unbounded.
1789         *  call = The routine to call when the queue is full.
1790         */
1791        final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure
1792        {
1793            synchronized (m_lock)
1794            {
1795                m_maxMsgs = num;
1796                m_onMaxMsgs = call;
1797            }
1798        }
1799
1800        /*
1801         * If maxMsgs is not set, the message is added to the queue and the
1802         * owner is notified.  If the queue is full, the message will still be
1803         * accepted if it is a control message, otherwise onCrowdingDoThis is
1804         * called.  If the routine returns true, this call will block until
1805         * the owner has made space available in the queue.  If it returns
1806         * false, this call will abort.
1807         *
1808         * Params:
1809         *  msg = The message to put in the queue.
1810         *
1811         * Throws:
1812         *  An exception if the queue is full and onCrowdingDoThis throws.
1813         */
1814        final void put(ref Message msg)
1815        {
1816            synchronized (m_lock)
1817            {
1818                // TODO: Generate an error here if m_closed is true, or maybe
1819                //       put a message in the caller's queue?
1820                if (!m_closed)
1821                {
1822                    while (true)
1823                    {
1824                        if (isPriorityMsg(msg))
1825                        {
1826                            m_sharedPty.put(msg);
1827                            m_putMsg.notify();
1828                            return;
1829                        }
1830                        if (!mboxFull() || isControlMsg(msg))
1831                        {
1832                            m_sharedBox.put(msg);
1833                            m_putMsg.notify();
1834                            return;
1835                        }
1836                        if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid))
1837                        {
1838                            return;
1839                        }
1840                        m_putQueue++;
1841                        m_notFull.wait();
1842                        m_putQueue--;
1843                    }
1844                }
1845            }
1846        }
1847
1848        /*
1849         * Matches ops against each message in turn until a match is found.
1850         *
1851         * Params:
1852         *  ops = The operations to match.  Each may return a bool to indicate
1853         *        whether a message with a matching type is truly a match.
1854         *
1855         * Returns:
1856         *  true if a message was retrieved and false if not (such as if a
1857         *  timeout occurred).
1858         *
1859         * Throws:
1860         *  LinkTerminated if a linked thread terminated, or OwnerTerminated
1861         * if the owner thread terminates and no existing messages match the
1862         * supplied ops.
1863         */
1864        bool get(T...)(scope T vals)
1865        {
1866            import std.meta : AliasSeq;
1867
1868            static assert(T.length);
1869
1870            static if (isImplicitlyConvertible!(T[0], Duration))
1871            {
1872                alias Ops = AliasSeq!(T[1 .. $]);
1873                alias ops = vals[1 .. $];
1874                enum timedWait = true;
1875                Duration period = vals[0];
1876            }
1877            else
1878            {
1879                alias Ops = AliasSeq!(T);
1880                alias ops = vals[0 .. $];
1881                enum timedWait = false;
1882            }
1883
1884            bool onStandardMsg(ref Message msg)
1885            {
1886                foreach (i, t; Ops)
1887                {
1888                    alias Args = Parameters!(t);
1889                    auto op = ops[i];
1890
1891                    if (msg.convertsTo!(Args))
1892                    {
1893                        static if (is(ReturnType!(t) == bool))
1894                        {
1895                            return msg.map(op);
1896                        }
1897                        else
1898                        {
1899                            msg.map(op);
1900                            return true;
1901                        }
1902                    }
1903                }
1904                return false;
1905            }
1906
1907            bool onLinkDeadMsg(ref Message msg)
1908            {
1909                assert(msg.convertsTo!(Tid));
1910                auto tid = msg.get!(Tid);
1911
1912                if (bool* pDepends = tid in thisInfo.links)
1913                {
1914                    auto depends = *pDepends;
1915                    thisInfo.links.remove(tid);
1916                    // Give the owner relationship precedence.
1917                    if (depends && tid != thisInfo.owner)
1918                    {
1919                        auto e = new LinkTerminated(tid);
1920                        auto m = Message(MsgType.standard, e);
1921                        if (onStandardMsg(m))
1922                            return true;
1923                        throw e;
1924                    }
1925                }
1926                if (tid == thisInfo.owner)
1927                {
1928                    thisInfo.owner = Tid.init;
1929                    auto e = new OwnerTerminated(tid);
1930                    auto m = Message(MsgType.standard, e);
1931                    if (onStandardMsg(m))
1932                        return true;
1933                    throw e;
1934                }
1935                return false;
1936            }
1937
1938            bool onControlMsg(ref Message msg)
1939            {
1940                switch (msg.type)
1941                {
1942                case MsgType.linkDead:
1943                    return onLinkDeadMsg(msg);
1944                default:
1945                    return false;
1946                }
1947            }
1948
1949            bool scan(ref ListT list)
1950            {
1951                for (auto range = list[]; !range.empty;)
1952                {
1953                    // Only the message handler will throw, so if this occurs
1954                    // we can be certain that the message was handled.
1955                    scope (failure)
1956                        list.removeAt(range);
1957
1958                    if (isControlMsg(range.front))
1959                    {
1960                        if (onControlMsg(range.front))
1961                        {
1962                            // Although the linkDead message is a control message,
1963                            // it can be handled by the user.  Since the linkDead
1964                            // message throws if not handled, if we get here then
1965                            // it has been handled and we can return from receive.
1966                            // This is a weird special case that will have to be
1967                            // handled in a more general way if more are added.
1968                            if (!isLinkDeadMsg(range.front))
1969                            {
1970                                list.removeAt(range);
1971                                continue;
1972                            }
1973                            list.removeAt(range);
1974                            return true;
1975                        }
1976                        range.popFront();
1977                        continue;
1978                    }
1979                    else
1980                    {
1981                        if (onStandardMsg(range.front))
1982                        {
1983                            list.removeAt(range);
1984                            return true;
1985                        }
1986                        range.popFront();
1987                        continue;
1988                    }
1989                }
1990                return false;
1991            }
1992
1993            bool pty(ref ListT list)
1994            {
1995                if (!list.empty)
1996                {
1997                    auto range = list[];
1998
1999                    if (onStandardMsg(range.front))
2000                    {
2001                        list.removeAt(range);
2002                        return true;
2003                    }
2004                    if (range.front.convertsTo!(Throwable))
2005                        throw range.front.get!(Throwable);
2006                    else if (range.front.convertsTo!(shared(Throwable)))
2007                        throw range.front.get!(shared(Throwable));
2008                    else
2009                        throw new PriorityMessageException(range.front.data);
2010                }
2011                return false;
2012            }
2013
2014            static if (timedWait)
2015            {
2016                import core.time : MonoTime;
2017                auto limit = MonoTime.currTime + period;
2018            }
2019
2020            while (true)
2021            {
2022                ListT arrived;
2023
2024                if (pty(m_localPty) || scan(m_localBox))
2025                {
2026                    return true;
2027                }
2028                yield();
2029                synchronized (m_lock)
2030                {
2031                    updateMsgCount();
2032                    while (m_sharedPty.empty && m_sharedBox.empty)
2033                    {
2034                        // NOTE: We're notifying all waiters here instead of just
2035                        //       a few because the onCrowding behavior may have
2036                        //       changed and we don't want to block sender threads
2037                        //       unnecessarily if the new behavior is not to block.
2038                        //       This will admittedly result in spurious wakeups
2039                        //       in other situations, but what can you do?
2040                        if (m_putQueue && !mboxFull())
2041                            m_notFull.notifyAll();
2042                        static if (timedWait)
2043                        {
2044                            if (period <= Duration.zero || !m_putMsg.wait(period))
2045                                return false;
2046                        }
2047                        else
2048                        {
2049                            m_putMsg.wait();
2050                        }
2051                    }
2052                    m_localPty.put(m_sharedPty);
2053                    arrived.put(m_sharedBox);
2054                }
2055                if (m_localPty.empty)
2056                {
2057                    scope (exit) m_localBox.put(arrived);
2058                    if (scan(arrived))
2059                    {
2060                        return true;
2061                    }
2062                    else
2063                    {
2064                        static if (timedWait)
2065                        {
2066                            period = limit - MonoTime.currTime;
2067                        }
2068                        continue;
2069                    }
2070                }
2071                m_localBox.put(arrived);
2072                pty(m_localPty);
2073                return true;
2074            }
2075        }
2076
2077        /*
2078         * Called on thread termination.  This routine processes any remaining
2079         * control messages, clears out message queues, and sets a flag to
2080         * reject any future messages.
2081         */
2082        final void close()
2083        {
2084            static void onLinkDeadMsg(ref Message msg)
2085            {
2086                assert(msg.convertsTo!(Tid));
2087                auto tid = msg.get!(Tid);
2088
2089                thisInfo.links.remove(tid);
2090                if (tid == thisInfo.owner)
2091                    thisInfo.owner = Tid.init;
2092            }
2093
2094            static void sweep(ref ListT list)
2095            {
2096                for (auto range = list[]; !range.empty; range.popFront())
2097                {
2098                    if (range.front.type == MsgType.linkDead)
2099                        onLinkDeadMsg(range.front);
2100                }
2101            }
2102
2103            ListT arrived;
2104
2105            sweep(m_localBox);
2106            synchronized (m_lock)
2107            {
2108                arrived.put(m_sharedBox);
2109                m_closed = true;
2110            }
2111            m_localBox.clear();
2112            sweep(arrived);
2113        }
2114
2115    private:
2116        // Routines involving local data only, no lock needed.
2117
2118        bool mboxFull() @safe @nogc pure nothrow
2119        {
2120            return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length;
2121        }
2122
2123        void updateMsgCount() @safe @nogc pure nothrow
2124        {
2125            m_localMsgs = m_localBox.length;
2126        }
2127
2128        bool isControlMsg(ref Message msg) @safe @nogc pure nothrow
2129        {
2130            return msg.type != MsgType.standard && msg.type != MsgType.priority;
2131        }
2132
2133        bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow
2134        {
2135            return msg.type == MsgType.priority;
2136        }
2137
2138        bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow
2139        {
2140            return msg.type == MsgType.linkDead;
2141        }
2142
2143        alias OnMaxFn = bool function(Tid);
2144        alias ListT = List!(Message);
2145
2146        ListT m_localBox;
2147        ListT m_localPty;
2148
2149        Mutex m_lock;
2150        Condition m_putMsg;
2151        Condition m_notFull;
2152        size_t m_putQueue;
2153        ListT m_sharedBox;
2154        ListT m_sharedPty;
2155        OnMaxFn m_onMaxMsgs;
2156        size_t m_localMsgs;
2157        size_t m_maxMsgs;
2158        bool m_closed;
2159    }
2160
2161    /*
2162     *
2163     */
2164    struct List(T)
2165    {
2166        struct Range
2167        {
2168            import std.exception : enforce;
2169
2170            @property bool empty() const
2171            {
2172                return !m_prev.next;
2173            }
2174
2175            @property ref T front()
2176            {
2177                enforce(m_prev.next, "invalid list node");
2178                return m_prev.next.val;
2179            }
2180
2181            @property void front(T val)
2182            {
2183                enforce(m_prev.next, "invalid list node");
2184                m_prev.next.val = val;
2185            }
2186
2187            void popFront()
2188            {
2189                enforce(m_prev.next, "invalid list node");
2190                m_prev = m_prev.next;
2191            }
2192
2193            private this(Node* p)
2194            {
2195                m_prev = p;
2196            }
2197
2198            private Node* m_prev;
2199        }
2200
2201        void put(T val)
2202        {
2203            put(newNode(val));
2204        }
2205
2206        void put(ref List!(T) rhs)
2207        {
2208            if (!rhs.empty)
2209            {
2210                put(rhs.m_first);
2211                while (m_last.next !is null)
2212                {
2213                    m_last = m_last.next;
2214                    m_count++;
2215                }
2216                rhs.m_first = null;
2217                rhs.m_last = null;
2218                rhs.m_count = 0;
2219            }
2220        }
2221
2222        Range opSlice()
2223        {
2224            return Range(cast(Node*)&m_first);
2225        }
2226
2227        void removeAt(Range r)
2228        {
2229            import std.exception : enforce;
2230
2231            assert(m_count);
2232            Node* n = r.m_prev;
2233            enforce(n && n.next, "attempting to remove invalid list node");
2234
2235            if (m_last is m_first)
2236                m_last = null;
2237            else if (m_last is n.next)
2238                m_last = n; // nocoverage
2239            Node* to_free = n.next;
2240            n.next = n.next.next;
2241            freeNode(to_free);
2242            m_count--;
2243        }
2244
2245        @property size_t length()
2246        {
2247            return m_count;
2248        }
2249
2250        void clear()
2251        {
2252            m_first = m_last = null;
2253            m_count = 0;
2254        }
2255
2256        @property bool empty()
2257        {
2258            return m_first is null;
2259        }
2260
2261    private:
2262        struct Node
2263        {
2264            Node* next;
2265            T val;
2266
2267            this(T v)
2268            {
2269                val = v;
2270            }
2271        }
2272
2273        static shared struct SpinLock
2274        {
2275            void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } }
2276            void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); }
2277            bool locked;
2278        }
2279
2280        static shared SpinLock sm_lock;
2281        static shared Node* sm_head;
2282
2283        Node* newNode(T v)
2284        {
2285            Node* n;
2286            {
2287                sm_lock.lock();
2288                scope (exit) sm_lock.unlock();
2289
2290                if (sm_head)
2291                {
2292                    n = cast(Node*) sm_head;
2293                    sm_head = sm_head.next;
2294                }
2295            }
2296            if (n)
2297            {
2298                import std.conv : emplace;
2299                emplace!Node(n, v);
2300            }
2301            else
2302            {
2303                n = new Node(v);
2304            }
2305            return n;
2306        }
2307
2308        void freeNode(Node* n)
2309        {
2310            // destroy val to free any owned GC memory
2311            destroy(n.val);
2312
2313            sm_lock.lock();
2314            scope (exit) sm_lock.unlock();
2315
2316            auto sn = cast(shared(Node)*) n;
2317            sn.next = sm_head;
2318            sm_head = sn;
2319        }
2320
2321        void put(Node* n)
2322        {
2323            m_count++;
2324            if (!empty)
2325            {
2326                m_last.next = n;
2327                m_last = n;
2328                return;
2329            }
2330            m_first = n;
2331            m_last = n;
2332        }
2333
2334        Node* m_first;
2335        Node* m_last;
2336        size_t m_count;
2337    }
2338}
2339
2340version (unittest)
2341{
2342    import std.stdio;
2343    import std.typecons : tuple, Tuple;
2344
2345    void testfn(Tid tid)
2346    {
2347        receive((float val) { assert(0); }, (int val, int val2) {
2348            assert(val == 42 && val2 == 86);
2349        });
2350        receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); });
2351        receive((Variant val) {  });
2352        receive((string val) {
2353            if ("the quick brown fox" != val)
2354                return false;
2355            return true;
2356        }, (string val) { assert(false); });
2357        prioritySend(tid, "done");
2358    }
2359
2360    void runTest(Tid tid)
2361    {
2362        send(tid, 42, 86);
2363        send(tid, tuple(42, 86));
2364        send(tid, "hello", "there");
2365        send(tid, "the quick brown fox");
2366        receive((string val) { assert(val == "done"); });
2367    }
2368
2369    void simpleTest()
2370    {
2371        auto tid = spawn(&testfn, thisTid);
2372        runTest(tid);
2373
2374        // Run the test again with a limited mailbox size.
2375        tid = spawn(&testfn, thisTid);
2376        setMaxMailboxSize(tid, 2, OnCrowding.block);
2377        runTest(tid);
2378    }
2379
2380    @system unittest
2381    {
2382        simpleTest();
2383    }
2384
2385    @system unittest
2386    {
2387        scheduler = new ThreadScheduler;
2388        simpleTest();
2389        scheduler = null;
2390    }
2391}
2392
2393private @property Mutex initOnceLock()
2394{
2395    __gshared Mutex lock;
2396    if (auto mtx = cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock))
2397        return mtx;
2398    auto mtx = new Mutex;
2399    if (cas(cast(shared)&lock, cast(shared) null, cast(shared) mtx))
2400        return mtx;
2401    return cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock);
2402}
2403
2404/**
2405 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a
2406 * thread-safe manner.
2407 *
2408 * The implementation guarantees that all threads simultaneously calling
2409 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is
2410 * fully initialized. All side-effects of $(D_PARAM init) are globally visible
2411 * afterwards.
2412 *
2413 * Params:
2414 *   var = The variable to initialize
2415 *   init = The lazy initializer value
2416 *
2417 * Returns:
2418 *   A reference to the initialized variable
2419 */
2420auto ref initOnce(alias var)(lazy typeof(var) init)
2421{
2422    return initOnce!var(init, initOnceLock);
2423}
2424
2425/// A typical use-case is to perform lazy but thread-safe initialization.
2426@system unittest
2427{
2428    static class MySingleton
2429    {
2430        static MySingleton instance()
2431        {
2432            static __gshared MySingleton inst;
2433            return initOnce!inst(new MySingleton);
2434        }
2435    }
2436
2437    assert(MySingleton.instance !is null);
2438}
2439
2440@system unittest
2441{
2442    static class MySingleton
2443    {
2444        static MySingleton instance()
2445        {
2446            static __gshared MySingleton inst;
2447            return initOnce!inst(new MySingleton);
2448        }
2449
2450    private:
2451        this() { val = ++cnt; }
2452        size_t val;
2453        static __gshared size_t cnt;
2454    }
2455
2456    foreach (_; 0 .. 10)
2457        spawn({ ownerTid.send(MySingleton.instance.val); });
2458    foreach (_; 0 .. 10)
2459        assert(receiveOnly!size_t == MySingleton.instance.val);
2460    assert(MySingleton.cnt == 1);
2461}
2462
2463/**
2464 * Same as above, but takes a separate mutex instead of sharing one among
2465 * all initOnce instances.
2466 *
2467 * This should be used to avoid dead-locks when the $(D_PARAM init)
2468 * expression waits for the result of another thread that might also
2469 * call initOnce. Use with care.
2470 *
2471 * Params:
2472 *   var = The variable to initialize
2473 *   init = The lazy initializer value
2474 *   mutex = A mutex to prevent race conditions
2475 *
2476 * Returns:
2477 *   A reference to the initialized variable
2478 */
2479auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex)
2480{
2481    // check that var is global, can't take address of a TLS variable
2482    static assert(is(typeof({ __gshared p = &var; })),
2483        "var must be 'static shared' or '__gshared'.");
2484    import core.atomic : atomicLoad, MemoryOrder, atomicStore;
2485
2486    static shared bool flag;
2487    if (!atomicLoad!(MemoryOrder.acq)(flag))
2488    {
2489        synchronized (mutex)
2490        {
2491            if (!atomicLoad!(MemoryOrder.acq)(flag))
2492            {
2493                var = init;
2494                atomicStore!(MemoryOrder.rel)(flag, true);
2495            }
2496        }
2497    }
2498    return var;
2499}
2500
2501/// Use a separate mutex when init blocks on another thread that might also call initOnce.
2502@system unittest
2503{
2504    import core.sync.mutex : Mutex;
2505
2506    static shared bool varA, varB;
2507    __gshared Mutex m;
2508    m = new Mutex;
2509
2510    spawn({
2511        // use a different mutex for varB to avoid a dead-lock
2512        initOnce!varB(true, m);
2513        ownerTid.send(true);
2514    });
2515    // init depends on the result of the spawned thread
2516    initOnce!varA(receiveOnly!bool);
2517    assert(varA == true);
2518    assert(varB == true);
2519}
2520
2521@system unittest
2522{
2523    static shared bool a;
2524    __gshared bool b;
2525    static bool c;
2526    bool d;
2527    initOnce!a(true);
2528    initOnce!b(true);
2529    static assert(!__traits(compiles, initOnce!c(true))); // TLS
2530    static assert(!__traits(compiles, initOnce!d(true))); // local variable
2531}
2532