1/** 2 * This is a low-level messaging API upon which more structured or restrictive 3 * APIs may be built. The general idea is that every messageable entity is 4 * represented by a common handle type called a Tid, which allows messages to 5 * be sent to logical threads that are executing in both the current process 6 * and in external processes using the same interface. This is an important 7 * aspect of scalability because it allows the components of a program to be 8 * spread across available resources with few to no changes to the actual 9 * implementation. 10 * 11 * A logical thread is an execution context that has its own stack and which 12 * runs asynchronously to other logical threads. These may be preemptively 13 * scheduled kernel threads, fibers (cooperative user-space threads), or some 14 * other concept with similar behavior. 15 * 16 * The type of concurrency used when logical threads are created is determined 17 * by the Scheduler selected at initialization time. The default behavior is 18 * currently to create a new kernel thread per call to spawn, but other 19 * schedulers are available that multiplex fibers across the main thread or 20 * use some combination of the two approaches. 21 * 22 * Copyright: Copyright Sean Kelly 2009 - 2014. 23 * License: <a href="http://www.boost.org/LICENSE_1_0.txt">Boost License 1.0</a>. 24 * Authors: Sean Kelly, Alex R��nne Petersen, Martin Nowak 25 * Source: $(PHOBOSSRC std/_concurrency.d) 26 */ 27/* Copyright Sean Kelly 2009 - 2014. 28 * Distributed under the Boost Software License, Version 1.0. 29 * (See accompanying file LICENSE_1_0.txt or copy at 30 * http://www.boost.org/LICENSE_1_0.txt) 31 */ 32module std.concurrency; 33 34public import std.variant; 35 36import core.atomic; 37import core.sync.condition; 38import core.sync.mutex; 39import core.thread; 40import std.range.primitives; 41import std.range.interfaces : InputRange; 42import std.traits; 43 44/// 45@system unittest 46{ 47 __gshared string received; 48 static void spawnedFunc(Tid ownerTid) 49 { 50 import std.conv : text; 51 // Receive a message from the owner thread. 52 receive((int i){ 53 received = text("Received the number ", i); 54 55 // Send a message back to the owner thread 56 // indicating success. 57 send(ownerTid, true); 58 }); 59 } 60 61 // Start spawnedFunc in a new thread. 62 auto childTid = spawn(&spawnedFunc, thisTid); 63 64 // Send the number 42 to this new thread. 65 send(childTid, 42); 66 67 // Receive the result code. 68 auto wasSuccessful = receiveOnly!(bool); 69 assert(wasSuccessful); 70 assert(received == "Received the number 42"); 71} 72 73private 74{ 75 template hasLocalAliasing(T...) 76 { 77 static if (!T.length) 78 enum hasLocalAliasing = false; 79 else 80 enum hasLocalAliasing = (std.traits.hasUnsharedAliasing!(T[0]) && !is(T[0] == Tid)) || 81 std.concurrency.hasLocalAliasing!(T[1 .. $]); 82 } 83 84 enum MsgType 85 { 86 standard, 87 priority, 88 linkDead, 89 } 90 91 struct Message 92 { 93 MsgType type; 94 Variant data; 95 96 this(T...)(MsgType t, T vals) if (T.length > 0) 97 { 98 static if (T.length == 1) 99 { 100 type = t; 101 data = vals[0]; 102 } 103 else 104 { 105 import std.typecons : Tuple; 106 107 type = t; 108 data = Tuple!(T)(vals); 109 } 110 } 111 112 @property auto convertsTo(T...)() 113 { 114 static if (T.length == 1) 115 { 116 return is(T[0] == Variant) || data.convertsTo!(T); 117 } 118 else 119 { 120 import std.typecons : Tuple; 121 return data.convertsTo!(Tuple!(T)); 122 } 123 } 124 125 @property auto get(T...)() 126 { 127 static if (T.length == 1) 128 { 129 static if (is(T[0] == Variant)) 130 return data; 131 else 132 return data.get!(T); 133 } 134 else 135 { 136 import std.typecons : Tuple; 137 return data.get!(Tuple!(T)); 138 } 139 } 140 141 auto map(Op)(Op op) 142 { 143 alias Args = Parameters!(Op); 144 145 static if (Args.length == 1) 146 { 147 static if (is(Args[0] == Variant)) 148 return op(data); 149 else 150 return op(data.get!(Args)); 151 } 152 else 153 { 154 import std.typecons : Tuple; 155 return op(data.get!(Tuple!(Args)).expand); 156 } 157 } 158 } 159 160 void checkops(T...)(T ops) 161 { 162 foreach (i, t1; T) 163 { 164 static assert(isFunctionPointer!t1 || isDelegate!t1); 165 alias a1 = Parameters!(t1); 166 alias r1 = ReturnType!(t1); 167 168 static if (i < T.length - 1 && is(r1 == void)) 169 { 170 static assert(a1.length != 1 || !is(a1[0] == Variant), 171 "function with arguments " ~ a1.stringof ~ 172 " occludes successive function"); 173 174 foreach (t2; T[i + 1 .. $]) 175 { 176 static assert(isFunctionPointer!t2 || isDelegate!t2); 177 alias a2 = Parameters!(t2); 178 179 static assert(!is(a1 == a2), 180 "function with arguments " ~ a1.stringof ~ " occludes successive function"); 181 } 182 } 183 } 184 } 185 186 @property ref ThreadInfo thisInfo() nothrow 187 { 188 if (scheduler is null) 189 return ThreadInfo.thisInfo; 190 return scheduler.thisInfo; 191 } 192} 193 194static ~this() 195{ 196 thisInfo.cleanup(); 197} 198 199// Exceptions 200 201/** 202 * Thrown on calls to $(D receiveOnly) if a message other than the type 203 * the receiving thread expected is sent. 204 */ 205class MessageMismatch : Exception 206{ 207 /// 208 this(string msg = "Unexpected message type") @safe pure nothrow @nogc 209 { 210 super(msg); 211 } 212} 213 214/** 215 * Thrown on calls to $(D receive) if the thread that spawned the receiving 216 * thread has terminated and no more messages exist. 217 */ 218class OwnerTerminated : Exception 219{ 220 /// 221 this(Tid t, string msg = "Owner terminated") @safe pure nothrow @nogc 222 { 223 super(msg); 224 tid = t; 225 } 226 227 Tid tid; 228} 229 230/** 231 * Thrown if a linked thread has terminated. 232 */ 233class LinkTerminated : Exception 234{ 235 /// 236 this(Tid t, string msg = "Link terminated") @safe pure nothrow @nogc 237 { 238 super(msg); 239 tid = t; 240 } 241 242 Tid tid; 243} 244 245/** 246 * Thrown if a message was sent to a thread via 247 * $(REF prioritySend, std,concurrency) and the receiver does not have a handler 248 * for a message of this type. 249 */ 250class PriorityMessageException : Exception 251{ 252 /// 253 this(Variant vals) 254 { 255 super("Priority message"); 256 message = vals; 257 } 258 259 /** 260 * The message that was sent. 261 */ 262 Variant message; 263} 264 265/** 266 * Thrown on mailbox crowding if the mailbox is configured with 267 * $(D OnCrowding.throwException). 268 */ 269class MailboxFull : Exception 270{ 271 /// 272 this(Tid t, string msg = "Mailbox full") @safe pure nothrow @nogc 273 { 274 super(msg); 275 tid = t; 276 } 277 278 Tid tid; 279} 280 281/** 282 * Thrown when a Tid is missing, e.g. when $(D ownerTid) doesn't 283 * find an owner thread. 284 */ 285class TidMissingException : Exception 286{ 287 import std.exception : basicExceptionCtors; 288 /// 289 mixin basicExceptionCtors; 290} 291 292 293// Thread ID 294 295 296/** 297 * An opaque type used to represent a logical thread. 298 */ 299struct Tid 300{ 301private: 302 this(MessageBox m) @safe pure nothrow @nogc 303 { 304 mbox = m; 305 } 306 307 MessageBox mbox; 308 309public: 310 311 /** 312 * Generate a convenient string for identifying this Tid. This is only 313 * useful to see if Tid's that are currently executing are the same or 314 * different, e.g. for logging and debugging. It is potentially possible 315 * that a Tid executed in the future will have the same toString() output 316 * as another Tid that has already terminated. 317 */ 318 void toString(scope void delegate(const(char)[]) sink) 319 { 320 import std.format : formattedWrite; 321 formattedWrite(sink, "Tid(%x)", cast(void*) mbox); 322 } 323 324} 325 326@system unittest 327{ 328 // text!Tid is @system 329 import std.conv : text; 330 Tid tid; 331 assert(text(tid) == "Tid(0)"); 332 auto tid2 = thisTid; 333 assert(text(tid2) != "Tid(0)"); 334 auto tid3 = tid2; 335 assert(text(tid2) == text(tid3)); 336} 337 338/** 339 * Returns: The $(LREF Tid) of the caller's thread. 340 */ 341@property Tid thisTid() @safe 342{ 343 // TODO: remove when concurrency is safe 344 static auto trus() @trusted 345 { 346 if (thisInfo.ident != Tid.init) 347 return thisInfo.ident; 348 thisInfo.ident = Tid(new MessageBox); 349 return thisInfo.ident; 350 } 351 352 return trus(); 353} 354 355/** 356 * Return the Tid of the thread which spawned the caller's thread. 357 * 358 * Throws: A $(D TidMissingException) exception if 359 * there is no owner thread. 360 */ 361@property Tid ownerTid() 362{ 363 import std.exception : enforce; 364 365 enforce!TidMissingException(thisInfo.owner.mbox !is null, "Error: Thread has no owner thread."); 366 return thisInfo.owner; 367} 368 369@system unittest 370{ 371 import std.exception : assertThrown; 372 373 static void fun() 374 { 375 string res = receiveOnly!string(); 376 assert(res == "Main calling"); 377 ownerTid.send("Child responding"); 378 } 379 380 assertThrown!TidMissingException(ownerTid); 381 auto child = spawn(&fun); 382 child.send("Main calling"); 383 string res = receiveOnly!string(); 384 assert(res == "Child responding"); 385} 386 387// Thread Creation 388 389private template isSpawnable(F, T...) 390{ 391 template isParamsImplicitlyConvertible(F1, F2, int i = 0) 392 { 393 alias param1 = Parameters!F1; 394 alias param2 = Parameters!F2; 395 static if (param1.length != param2.length) 396 enum isParamsImplicitlyConvertible = false; 397 else static if (param1.length == i) 398 enum isParamsImplicitlyConvertible = true; 399 else static if (isImplicitlyConvertible!(param2[i], param1[i])) 400 enum isParamsImplicitlyConvertible = isParamsImplicitlyConvertible!(F1, 401 F2, i + 1); 402 else 403 enum isParamsImplicitlyConvertible = false; 404 } 405 406 enum isSpawnable = isCallable!F && is(ReturnType!F == void) 407 && isParamsImplicitlyConvertible!(F, void function(T)) 408 && (isFunctionPointer!F || !hasUnsharedAliasing!F); 409} 410 411/** 412 * Starts fn(args) in a new logical thread. 413 * 414 * Executes the supplied function in a new logical thread represented by 415 * $(D Tid). The calling thread is designated as the owner of the new thread. 416 * When the owner thread terminates an $(D OwnerTerminated) message will be 417 * sent to the new thread, causing an $(D OwnerTerminated) exception to be 418 * thrown on $(D receive()). 419 * 420 * Params: 421 * fn = The function to execute. 422 * args = Arguments to the function. 423 * 424 * Returns: 425 * A Tid representing the new logical thread. 426 * 427 * Notes: 428 * $(D args) must not have unshared aliasing. In other words, all arguments 429 * to $(D fn) must either be $(D shared) or $(D immutable) or have no 430 * pointer indirection. This is necessary for enforcing isolation among 431 * threads. 432 * 433 * Example: 434 * --- 435 * import std.stdio, std.concurrency; 436 * 437 * void f1(string str) 438 * { 439 * writeln(str); 440 * } 441 * 442 * void f2(char[] str) 443 * { 444 * writeln(str); 445 * } 446 * 447 * void main() 448 * { 449 * auto str = "Hello, world"; 450 * 451 * // Works: string is immutable. 452 * auto tid1 = spawn(&f1, str); 453 * 454 * // Fails: char[] has mutable aliasing. 455 * auto tid2 = spawn(&f2, str.dup); 456 * 457 * // New thread with anonymous function 458 * spawn({ writeln("This is so great!"); }); 459 * } 460 * --- 461 */ 462Tid spawn(F, T...)(F fn, T args) if (isSpawnable!(F, T)) 463{ 464 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 465 return _spawn(false, fn, args); 466} 467 468/** 469 * Starts fn(args) in a logical thread and will receive a LinkTerminated 470 * message when the operation terminates. 471 * 472 * Executes the supplied function in a new logical thread represented by 473 * Tid. This new thread is linked to the calling thread so that if either 474 * it or the calling thread terminates a LinkTerminated message will be sent 475 * to the other, causing a LinkTerminated exception to be thrown on receive(). 476 * The owner relationship from spawn() is preserved as well, so if the link 477 * between threads is broken, owner termination will still result in an 478 * OwnerTerminated exception to be thrown on receive(). 479 * 480 * Params: 481 * fn = The function to execute. 482 * args = Arguments to the function. 483 * 484 * Returns: 485 * A Tid representing the new thread. 486 */ 487Tid spawnLinked(F, T...)(F fn, T args) if (isSpawnable!(F, T)) 488{ 489 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 490 return _spawn(true, fn, args); 491} 492 493/* 494 * 495 */ 496private Tid _spawn(F, T...)(bool linked, F fn, T args) if (isSpawnable!(F, T)) 497{ 498 // TODO: MessageList and &exec should be shared. 499 auto spawnTid = Tid(new MessageBox); 500 auto ownerTid = thisTid; 501 502 void exec() 503 { 504 thisInfo.ident = spawnTid; 505 thisInfo.owner = ownerTid; 506 fn(args); 507 } 508 509 // TODO: MessageList and &exec should be shared. 510 if (scheduler !is null) 511 scheduler.spawn(&exec); 512 else 513 { 514 auto t = new Thread(&exec); 515 t.start(); 516 } 517 thisInfo.links[spawnTid] = linked; 518 return spawnTid; 519} 520 521@system unittest 522{ 523 void function() fn1; 524 void function(int) fn2; 525 static assert(__traits(compiles, spawn(fn1))); 526 static assert(__traits(compiles, spawn(fn2, 2))); 527 static assert(!__traits(compiles, spawn(fn1, 1))); 528 static assert(!__traits(compiles, spawn(fn2))); 529 530 void delegate(int) shared dg1; 531 shared(void delegate(int)) dg2; 532 shared(void delegate(long) shared) dg3; 533 shared(void delegate(real, int, long) shared) dg4; 534 void delegate(int) immutable dg5; 535 void delegate(int) dg6; 536 static assert(__traits(compiles, spawn(dg1, 1))); 537 static assert(__traits(compiles, spawn(dg2, 2))); 538 static assert(__traits(compiles, spawn(dg3, 3))); 539 static assert(__traits(compiles, spawn(dg4, 4, 4, 4))); 540 static assert(__traits(compiles, spawn(dg5, 5))); 541 static assert(!__traits(compiles, spawn(dg6, 6))); 542 543 auto callable1 = new class{ void opCall(int) shared {} }; 544 auto callable2 = cast(shared) new class{ void opCall(int) shared {} }; 545 auto callable3 = new class{ void opCall(int) immutable {} }; 546 auto callable4 = cast(immutable) new class{ void opCall(int) immutable {} }; 547 auto callable5 = new class{ void opCall(int) {} }; 548 auto callable6 = cast(shared) new class{ void opCall(int) immutable {} }; 549 auto callable7 = cast(immutable) new class{ void opCall(int) shared {} }; 550 auto callable8 = cast(shared) new class{ void opCall(int) const shared {} }; 551 auto callable9 = cast(const shared) new class{ void opCall(int) shared {} }; 552 auto callable10 = cast(const shared) new class{ void opCall(int) const shared {} }; 553 auto callable11 = cast(immutable) new class{ void opCall(int) const shared {} }; 554 static assert(!__traits(compiles, spawn(callable1, 1))); 555 static assert( __traits(compiles, spawn(callable2, 2))); 556 static assert(!__traits(compiles, spawn(callable3, 3))); 557 static assert( __traits(compiles, spawn(callable4, 4))); 558 static assert(!__traits(compiles, spawn(callable5, 5))); 559 static assert(!__traits(compiles, spawn(callable6, 6))); 560 static assert(!__traits(compiles, spawn(callable7, 7))); 561 static assert( __traits(compiles, spawn(callable8, 8))); 562 static assert(!__traits(compiles, spawn(callable9, 9))); 563 static assert( __traits(compiles, spawn(callable10, 10))); 564 static assert( __traits(compiles, spawn(callable11, 11))); 565} 566 567/** 568 * Places the values as a message at the back of tid's message queue. 569 * 570 * Sends the supplied value to the thread represented by tid. As with 571 * $(REF spawn, std,concurrency), $(D T) must not have unshared aliasing. 572 */ 573void send(T...)(Tid tid, T vals) 574{ 575 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 576 _send(tid, vals); 577} 578 579/** 580 * Places the values as a message on the front of tid's message queue. 581 * 582 * Send a message to $(D tid) but place it at the front of $(D tid)'s message 583 * queue instead of at the back. This function is typically used for 584 * out-of-band communication, to signal exceptional conditions, etc. 585 */ 586void prioritySend(T...)(Tid tid, T vals) 587{ 588 static assert(!hasLocalAliasing!(T), "Aliases to mutable thread-local data not allowed."); 589 _send(MsgType.priority, tid, vals); 590} 591 592/* 593 * ditto 594 */ 595private void _send(T...)(Tid tid, T vals) 596{ 597 _send(MsgType.standard, tid, vals); 598} 599 600/* 601 * Implementation of send. This allows parameter checking to be different for 602 * both Tid.send() and .send(). 603 */ 604private void _send(T...)(MsgType type, Tid tid, T vals) 605{ 606 auto msg = Message(type, vals); 607 tid.mbox.put(msg); 608} 609 610/** 611 * Receives a message from another thread. 612 * 613 * Receive a message from another thread, or block if no messages of the 614 * specified types are available. This function works by pattern matching 615 * a message against a set of delegates and executing the first match found. 616 * 617 * If a delegate that accepts a $(REF Variant, std,variant) is included as 618 * the last argument to $(D receive), it will match any message that was not 619 * matched by an earlier delegate. If more than one argument is sent, 620 * the $(D Variant) will contain a $(REF Tuple, std,typecons) of all values 621 * sent. 622 * 623 * Example: 624 * --- 625 * import std.stdio; 626 * import std.variant; 627 * import std.concurrency; 628 * 629 * void spawnedFunction() 630 * { 631 * receive( 632 * (int i) { writeln("Received an int."); }, 633 * (float f) { writeln("Received a float."); }, 634 * (Variant v) { writeln("Received some other type."); } 635 * ); 636 * } 637 * 638 * void main() 639 * { 640 * auto tid = spawn(&spawnedFunction); 641 * send(tid, 42); 642 * } 643 * --- 644 */ 645void receive(T...)( T ops ) 646in 647{ 648 assert(thisInfo.ident.mbox !is null, 649 "Cannot receive a message until a thread was spawned " 650 ~ "or thisTid was passed to a running thread."); 651} 652body 653{ 654 checkops( ops ); 655 656 thisInfo.ident.mbox.get( ops ); 657} 658 659 660@safe unittest 661{ 662 static assert( __traits( compiles, 663 { 664 receive( (Variant x) {} ); 665 receive( (int x) {}, (Variant x) {} ); 666 } ) ); 667 668 static assert( !__traits( compiles, 669 { 670 receive( (Variant x) {}, (int x) {} ); 671 } ) ); 672 673 static assert( !__traits( compiles, 674 { 675 receive( (int x) {}, (int x) {} ); 676 } ) ); 677} 678 679// Make sure receive() works with free functions as well. 680version (unittest) 681{ 682 private void receiveFunction(int x) {} 683} 684@safe unittest 685{ 686 static assert( __traits( compiles, 687 { 688 receive( &receiveFunction ); 689 receive( &receiveFunction, (Variant x) {} ); 690 } ) ); 691} 692 693 694private template receiveOnlyRet(T...) 695{ 696 static if ( T.length == 1 ) 697 { 698 alias receiveOnlyRet = T[0]; 699 } 700 else 701 { 702 import std.typecons : Tuple; 703 alias receiveOnlyRet = Tuple!(T); 704 } 705} 706 707/** 708 * Receives only messages with arguments of types $(D T). 709 * 710 * Throws: $(D MessageMismatch) if a message of types other than $(D T) 711 * is received. 712 * 713 * Returns: The received message. If $(D T.length) is greater than one, 714 * the message will be packed into a $(REF Tuple, std,typecons). 715 * 716 * Example: 717 * --- 718 * import std.concurrency; 719 * 720 * void spawnedFunc() 721 * { 722 * auto msg = receiveOnly!(int, string)(); 723 * assert(msg[0] == 42); 724 * assert(msg[1] == "42"); 725 * } 726 * 727 * void main() 728 * { 729 * auto tid = spawn(&spawnedFunc); 730 * send(tid, 42, "42"); 731 * } 732 * --- 733 */ 734receiveOnlyRet!(T) receiveOnly(T...)() 735in 736{ 737 assert(thisInfo.ident.mbox !is null, 738 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 739} 740body 741{ 742 import std.format : format; 743 import std.typecons : Tuple; 744 745 Tuple!(T) ret; 746 747 thisInfo.ident.mbox.get((T val) { 748 static if (T.length) 749 ret.field = val; 750 }, 751 (LinkTerminated e) { throw e; }, 752 (OwnerTerminated e) { throw e; }, 753 (Variant val) { 754 static if (T.length > 1) 755 string exp = T.stringof; 756 else 757 string exp = T[0].stringof; 758 759 throw new MessageMismatch( 760 format("Unexpected message type: expected '%s', got '%s'", exp, val.type.toString())); 761 }); 762 static if (T.length == 1) 763 return ret[0]; 764 else 765 return ret; 766} 767 768@system unittest 769{ 770 static void t1(Tid mainTid) 771 { 772 try 773 { 774 receiveOnly!string(); 775 mainTid.send(""); 776 } 777 catch (Throwable th) 778 { 779 mainTid.send(th.msg); 780 } 781 } 782 783 auto tid = spawn(&t1, thisTid); 784 tid.send(1); 785 string result = receiveOnly!string(); 786 assert(result == "Unexpected message type: expected 'string', got 'int'"); 787} 788 789/** 790 * Tries to receive but will give up if no matches arrive within duration. 791 * Won't wait at all if provided $(REF Duration, core,time) is negative. 792 * 793 * Same as $(D receive) except that rather than wait forever for a message, 794 * it waits until either it receives a message or the given 795 * $(REF Duration, core,time) has passed. It returns $(D true) if it received a 796 * message and $(D false) if it timed out waiting for one. 797 */ 798bool receiveTimeout(T...)(Duration duration, T ops) 799in 800{ 801 assert(thisInfo.ident.mbox !is null, 802 "Cannot receive a message until a thread was spawned or thisTid was passed to a running thread."); 803} 804body 805{ 806 checkops(ops); 807 808 return thisInfo.ident.mbox.get(duration, ops); 809} 810 811@safe unittest 812{ 813 static assert(__traits(compiles, { 814 receiveTimeout(msecs(0), (Variant x) {}); 815 receiveTimeout(msecs(0), (int x) {}, (Variant x) {}); 816 })); 817 818 static assert(!__traits(compiles, { 819 receiveTimeout(msecs(0), (Variant x) {}, (int x) {}); 820 })); 821 822 static assert(!__traits(compiles, { 823 receiveTimeout(msecs(0), (int x) {}, (int x) {}); 824 })); 825 826 static assert(__traits(compiles, { 827 receiveTimeout(msecs(10), (int x) {}, (Variant x) {}); 828 })); 829} 830 831// MessageBox Limits 832 833/** 834 * These behaviors may be specified when a mailbox is full. 835 */ 836enum OnCrowding 837{ 838 block, /// Wait until room is available. 839 throwException, /// Throw a MailboxFull exception. 840 ignore /// Abort the send and return. 841} 842 843private 844{ 845 bool onCrowdingBlock(Tid tid) @safe pure nothrow @nogc 846 { 847 return true; 848 } 849 850 bool onCrowdingThrow(Tid tid) @safe pure 851 { 852 throw new MailboxFull(tid); 853 } 854 855 bool onCrowdingIgnore(Tid tid) @safe pure nothrow @nogc 856 { 857 return false; 858 } 859} 860 861/** 862 * Sets a maximum mailbox size. 863 * 864 * Sets a limit on the maximum number of user messages allowed in the mailbox. 865 * If this limit is reached, the caller attempting to add a new message will 866 * execute the behavior specified by doThis. If messages is zero, the mailbox 867 * is unbounded. 868 * 869 * Params: 870 * tid = The Tid of the thread for which this limit should be set. 871 * messages = The maximum number of messages or zero if no limit. 872 * doThis = The behavior executed when a message is sent to a full 873 * mailbox. 874 */ 875void setMaxMailboxSize(Tid tid, size_t messages, OnCrowding doThis) @safe pure 876{ 877 final switch (doThis) 878 { 879 case OnCrowding.block: 880 return tid.mbox.setMaxMsgs(messages, &onCrowdingBlock); 881 case OnCrowding.throwException: 882 return tid.mbox.setMaxMsgs(messages, &onCrowdingThrow); 883 case OnCrowding.ignore: 884 return tid.mbox.setMaxMsgs(messages, &onCrowdingIgnore); 885 } 886} 887 888/** 889 * Sets a maximum mailbox size. 890 * 891 * Sets a limit on the maximum number of user messages allowed in the mailbox. 892 * If this limit is reached, the caller attempting to add a new message will 893 * execute onCrowdingDoThis. If messages is zero, the mailbox is unbounded. 894 * 895 * Params: 896 * tid = The Tid of the thread for which this limit should be set. 897 * messages = The maximum number of messages or zero if no limit. 898 * onCrowdingDoThis = The routine called when a message is sent to a full 899 * mailbox. 900 */ 901void setMaxMailboxSize(Tid tid, size_t messages, bool function(Tid) onCrowdingDoThis) 902{ 903 tid.mbox.setMaxMsgs(messages, onCrowdingDoThis); 904} 905 906private 907{ 908 __gshared Tid[string] tidByName; 909 __gshared string[][Tid] namesByTid; 910} 911 912private @property Mutex registryLock() 913{ 914 __gshared Mutex impl; 915 initOnce!impl(new Mutex); 916 return impl; 917} 918 919private void unregisterMe() 920{ 921 auto me = thisInfo.ident; 922 if (thisInfo.ident != Tid.init) 923 { 924 synchronized (registryLock) 925 { 926 if (auto allNames = me in namesByTid) 927 { 928 foreach (name; *allNames) 929 tidByName.remove(name); 930 namesByTid.remove(me); 931 } 932 } 933 } 934} 935 936/** 937 * Associates name with tid. 938 * 939 * Associates name with tid in a process-local map. When the thread 940 * represented by tid terminates, any names associated with it will be 941 * automatically unregistered. 942 * 943 * Params: 944 * name = The name to associate with tid. 945 * tid = The tid register by name. 946 * 947 * Returns: 948 * true if the name is available and tid is not known to represent a 949 * defunct thread. 950 */ 951bool register(string name, Tid tid) 952{ 953 synchronized (registryLock) 954 { 955 if (name in tidByName) 956 return false; 957 if (tid.mbox.isClosed) 958 return false; 959 namesByTid[tid] ~= name; 960 tidByName[name] = tid; 961 return true; 962 } 963} 964 965/** 966 * Removes the registered name associated with a tid. 967 * 968 * Params: 969 * name = The name to unregister. 970 * 971 * Returns: 972 * true if the name is registered, false if not. 973 */ 974bool unregister(string name) 975{ 976 import std.algorithm.mutation : remove, SwapStrategy; 977 import std.algorithm.searching : countUntil; 978 979 synchronized (registryLock) 980 { 981 if (auto tid = name in tidByName) 982 { 983 auto allNames = *tid in namesByTid; 984 auto pos = countUntil(*allNames, name); 985 remove!(SwapStrategy.unstable)(*allNames, pos); 986 tidByName.remove(name); 987 return true; 988 } 989 return false; 990 } 991} 992 993/** 994 * Gets the Tid associated with name. 995 * 996 * Params: 997 * name = The name to locate within the registry. 998 * 999 * Returns: 1000 * The associated Tid or Tid.init if name is not registered. 1001 */ 1002Tid locate(string name) 1003{ 1004 synchronized (registryLock) 1005 { 1006 if (auto tid = name in tidByName) 1007 return *tid; 1008 return Tid.init; 1009 } 1010} 1011 1012/** 1013 * Encapsulates all implementation-level data needed for scheduling. 1014 * 1015 * When defining a Scheduler, an instance of this struct must be associated 1016 * with each logical thread. It contains all implementation-level information 1017 * needed by the internal API. 1018 */ 1019struct ThreadInfo 1020{ 1021 Tid ident; 1022 bool[Tid] links; 1023 Tid owner; 1024 1025 /** 1026 * Gets a thread-local instance of ThreadInfo. 1027 * 1028 * Gets a thread-local instance of ThreadInfo, which should be used as the 1029 * default instance when info is requested for a thread not created by the 1030 * Scheduler. 1031 */ 1032 static @property ref thisInfo() nothrow 1033 { 1034 static ThreadInfo val; 1035 return val; 1036 } 1037 1038 /** 1039 * Cleans up this ThreadInfo. 1040 * 1041 * This must be called when a scheduled thread terminates. It tears down 1042 * the messaging system for the thread and notifies interested parties of 1043 * the thread's termination. 1044 */ 1045 void cleanup() 1046 { 1047 if (ident.mbox !is null) 1048 ident.mbox.close(); 1049 foreach (tid; links.keys) 1050 _send(MsgType.linkDead, tid, ident); 1051 if (owner != Tid.init) 1052 _send(MsgType.linkDead, owner, ident); 1053 unregisterMe(); // clean up registry entries 1054 } 1055} 1056 1057/** 1058 * A Scheduler controls how threading is performed by spawn. 1059 * 1060 * Implementing a Scheduler allows the concurrency mechanism used by this 1061 * module to be customized according to different needs. By default, a call 1062 * to spawn will create a new kernel thread that executes the supplied routine 1063 * and terminates when finished. But it is possible to create Schedulers that 1064 * reuse threads, that multiplex Fibers (coroutines) across a single thread, 1065 * or any number of other approaches. By making the choice of Scheduler a 1066 * user-level option, std.concurrency may be used for far more types of 1067 * application than if this behavior were predefined. 1068 * 1069 * Example: 1070 * --- 1071 * import std.concurrency; 1072 * import std.stdio; 1073 * 1074 * void main() 1075 * { 1076 * scheduler = new FiberScheduler; 1077 * scheduler.start( 1078 * { 1079 * writeln("the rest of main goes here"); 1080 * }); 1081 * } 1082 * --- 1083 * 1084 * Some schedulers have a dispatching loop that must run if they are to work 1085 * properly, so for the sake of consistency, when using a scheduler, start() 1086 * must be called within main(). This yields control to the scheduler and 1087 * will ensure that any spawned threads are executed in an expected manner. 1088 */ 1089interface Scheduler 1090{ 1091 /** 1092 * Spawns the supplied op and starts the Scheduler. 1093 * 1094 * This is intended to be called at the start of the program to yield all 1095 * scheduling to the active Scheduler instance. This is necessary for 1096 * schedulers that explicitly dispatch threads rather than simply relying 1097 * on the operating system to do so, and so start should always be called 1098 * within main() to begin normal program execution. 1099 * 1100 * Params: 1101 * op = A wrapper for whatever the main thread would have done in the 1102 * absence of a custom scheduler. It will be automatically executed 1103 * via a call to spawn by the Scheduler. 1104 */ 1105 void start(void delegate() op); 1106 1107 /** 1108 * Assigns a logical thread to execute the supplied op. 1109 * 1110 * This routine is called by spawn. It is expected to instantiate a new 1111 * logical thread and run the supplied operation. This thread must call 1112 * thisInfo.cleanup() when the thread terminates if the scheduled thread 1113 * is not a kernel thread--all kernel threads will have their ThreadInfo 1114 * cleaned up automatically by a thread-local destructor. 1115 * 1116 * Params: 1117 * op = The function to execute. This may be the actual function passed 1118 * by the user to spawn itself, or may be a wrapper function. 1119 */ 1120 void spawn(void delegate() op); 1121 1122 /** 1123 * Yields execution to another logical thread. 1124 * 1125 * This routine is called at various points within concurrency-aware APIs 1126 * to provide a scheduler a chance to yield execution when using some sort 1127 * of cooperative multithreading model. If this is not appropriate, such 1128 * as when each logical thread is backed by a dedicated kernel thread, 1129 * this routine may be a no-op. 1130 */ 1131 void yield() nothrow; 1132 1133 /** 1134 * Returns an appropriate ThreadInfo instance. 1135 * 1136 * Returns an instance of ThreadInfo specific to the logical thread that 1137 * is calling this routine or, if the calling thread was not create by 1138 * this scheduler, returns ThreadInfo.thisInfo instead. 1139 */ 1140 @property ref ThreadInfo thisInfo() nothrow; 1141 1142 /** 1143 * Creates a Condition variable analog for signaling. 1144 * 1145 * Creates a new Condition variable analog which is used to check for and 1146 * to signal the addition of messages to a thread's message queue. Like 1147 * yield, some schedulers may need to define custom behavior so that calls 1148 * to Condition.wait() yield to another thread when no new messages are 1149 * available instead of blocking. 1150 * 1151 * Params: 1152 * m = The Mutex that will be associated with this condition. It will be 1153 * locked prior to any operation on the condition, and so in some 1154 * cases a Scheduler may need to hold this reference and unlock the 1155 * mutex before yielding execution to another logical thread. 1156 */ 1157 Condition newCondition(Mutex m) nothrow; 1158} 1159 1160/** 1161 * An example Scheduler using kernel threads. 1162 * 1163 * This is an example Scheduler that mirrors the default scheduling behavior 1164 * of creating one kernel thread per call to spawn. It is fully functional 1165 * and may be instantiated and used, but is not a necessary part of the 1166 * default functioning of this module. 1167 */ 1168class ThreadScheduler : Scheduler 1169{ 1170 /** 1171 * This simply runs op directly, since no real scheduling is needed by 1172 * this approach. 1173 */ 1174 void start(void delegate() op) 1175 { 1176 op(); 1177 } 1178 1179 /** 1180 * Creates a new kernel thread and assigns it to run the supplied op. 1181 */ 1182 void spawn(void delegate() op) 1183 { 1184 auto t = new Thread(op); 1185 t.start(); 1186 } 1187 1188 /** 1189 * This scheduler does no explicit multiplexing, so this is a no-op. 1190 */ 1191 void yield() nothrow 1192 { 1193 // no explicit yield needed 1194 } 1195 1196 /** 1197 * Returns ThreadInfo.thisInfo, since it is a thread-local instance of 1198 * ThreadInfo, which is the correct behavior for this scheduler. 1199 */ 1200 @property ref ThreadInfo thisInfo() nothrow 1201 { 1202 return ThreadInfo.thisInfo; 1203 } 1204 1205 /** 1206 * Creates a new Condition variable. No custom behavior is needed here. 1207 */ 1208 Condition newCondition(Mutex m) nothrow 1209 { 1210 return new Condition(m); 1211 } 1212} 1213 1214/** 1215 * An example Scheduler using Fibers. 1216 * 1217 * This is an example scheduler that creates a new Fiber per call to spawn 1218 * and multiplexes the execution of all fibers within the main thread. 1219 */ 1220class FiberScheduler : Scheduler 1221{ 1222 /** 1223 * This creates a new Fiber for the supplied op and then starts the 1224 * dispatcher. 1225 */ 1226 void start(void delegate() op) 1227 { 1228 create(op); 1229 dispatch(); 1230 } 1231 1232 /** 1233 * This created a new Fiber for the supplied op and adds it to the 1234 * dispatch list. 1235 */ 1236 void spawn(void delegate() op) nothrow 1237 { 1238 create(op); 1239 yield(); 1240 } 1241 1242 /** 1243 * If the caller is a scheduled Fiber, this yields execution to another 1244 * scheduled Fiber. 1245 */ 1246 void yield() nothrow 1247 { 1248 // NOTE: It's possible that we should test whether the calling Fiber 1249 // is an InfoFiber before yielding, but I think it's reasonable 1250 // that any (non-Generator) fiber should yield here. 1251 if (Fiber.getThis()) 1252 Fiber.yield(); 1253 } 1254 1255 /** 1256 * Returns an appropriate ThreadInfo instance. 1257 * 1258 * Returns a ThreadInfo instance specific to the calling Fiber if the 1259 * Fiber was created by this dispatcher, otherwise it returns 1260 * ThreadInfo.thisInfo. 1261 */ 1262 @property ref ThreadInfo thisInfo() nothrow 1263 { 1264 auto f = cast(InfoFiber) Fiber.getThis(); 1265 1266 if (f !is null) 1267 return f.info; 1268 return ThreadInfo.thisInfo; 1269 } 1270 1271 /** 1272 * Returns a Condition analog that yields when wait or notify is called. 1273 */ 1274 Condition newCondition(Mutex m) nothrow 1275 { 1276 return new FiberCondition(m); 1277 } 1278 1279private: 1280 static class InfoFiber : Fiber 1281 { 1282 ThreadInfo info; 1283 1284 this(void delegate() op) nothrow 1285 { 1286 super(op); 1287 } 1288 } 1289 1290 class FiberCondition : Condition 1291 { 1292 this(Mutex m) nothrow 1293 { 1294 super(m); 1295 notified = false; 1296 } 1297 1298 override void wait() nothrow 1299 { 1300 scope (exit) notified = false; 1301 1302 while (!notified) 1303 switchContext(); 1304 } 1305 1306 override bool wait(Duration period) nothrow 1307 { 1308 import core.time : MonoTime; 1309 1310 scope (exit) notified = false; 1311 1312 for (auto limit = MonoTime.currTime + period; 1313 !notified && !period.isNegative; 1314 period = limit - MonoTime.currTime) 1315 { 1316 yield(); 1317 } 1318 return notified; 1319 } 1320 1321 override void notify() nothrow 1322 { 1323 notified = true; 1324 switchContext(); 1325 } 1326 1327 override void notifyAll() nothrow 1328 { 1329 notified = true; 1330 switchContext(); 1331 } 1332 1333 private: 1334 void switchContext() nothrow 1335 { 1336 mutex_nothrow.unlock_nothrow(); 1337 scope (exit) mutex_nothrow.lock_nothrow(); 1338 yield(); 1339 } 1340 1341 private bool notified; 1342 } 1343 1344private: 1345 void dispatch() 1346 { 1347 import std.algorithm.mutation : remove; 1348 1349 while (m_fibers.length > 0) 1350 { 1351 auto t = m_fibers[m_pos].call(Fiber.Rethrow.no); 1352 if (t !is null && !(cast(OwnerTerminated) t)) 1353 { 1354 throw t; 1355 } 1356 if (m_fibers[m_pos].state == Fiber.State.TERM) 1357 { 1358 if (m_pos >= (m_fibers = remove(m_fibers, m_pos)).length) 1359 m_pos = 0; 1360 } 1361 else if (m_pos++ >= m_fibers.length - 1) 1362 { 1363 m_pos = 0; 1364 } 1365 } 1366 } 1367 1368 void create(void delegate() op) nothrow 1369 { 1370 void wrap() 1371 { 1372 scope (exit) 1373 { 1374 thisInfo.cleanup(); 1375 } 1376 op(); 1377 } 1378 1379 m_fibers ~= new InfoFiber(&wrap); 1380 } 1381 1382private: 1383 Fiber[] m_fibers; 1384 size_t m_pos; 1385} 1386 1387@system unittest 1388{ 1389 static void receive(Condition cond, ref size_t received) 1390 { 1391 while (true) 1392 { 1393 synchronized (cond.mutex) 1394 { 1395 cond.wait(); 1396 ++received; 1397 } 1398 } 1399 } 1400 1401 static void send(Condition cond, ref size_t sent) 1402 { 1403 while (true) 1404 { 1405 synchronized (cond.mutex) 1406 { 1407 ++sent; 1408 cond.notify(); 1409 } 1410 } 1411 } 1412 1413 auto fs = new FiberScheduler; 1414 auto mtx = new Mutex; 1415 auto cond = fs.newCondition(mtx); 1416 1417 size_t received, sent; 1418 auto waiter = new Fiber({ receive(cond, received); }), notifier = new Fiber({ send(cond, sent); }); 1419 waiter.call(); 1420 assert(received == 0); 1421 notifier.call(); 1422 assert(sent == 1); 1423 assert(received == 0); 1424 waiter.call(); 1425 assert(received == 1); 1426 waiter.call(); 1427 assert(received == 1); 1428} 1429 1430/** 1431 * Sets the Scheduler behavior within the program. 1432 * 1433 * This variable sets the Scheduler behavior within this program. Typically, 1434 * when setting a Scheduler, scheduler.start() should be called in main. This 1435 * routine will not return until program execution is complete. 1436 */ 1437__gshared Scheduler scheduler; 1438 1439// Generator 1440 1441/** 1442 * If the caller is a Fiber and is not a Generator, this function will call 1443 * scheduler.yield() or Fiber.yield(), as appropriate. 1444 */ 1445void yield() nothrow 1446{ 1447 auto fiber = Fiber.getThis(); 1448 if (!(cast(IsGenerator) fiber)) 1449 { 1450 if (scheduler is null) 1451 { 1452 if (fiber) 1453 return Fiber.yield(); 1454 } 1455 else 1456 scheduler.yield(); 1457 } 1458} 1459 1460/// Used to determine whether a Generator is running. 1461private interface IsGenerator {} 1462 1463 1464/** 1465 * A Generator is a Fiber that periodically returns values of type T to the 1466 * caller via yield. This is represented as an InputRange. 1467 * 1468 * Example: 1469 * --- 1470 * import std.concurrency; 1471 * import std.stdio; 1472 * 1473 * 1474 * void main() 1475 * { 1476 * auto tid = spawn( 1477 * { 1478 * while (true) 1479 * { 1480 * writeln(receiveOnly!int()); 1481 * } 1482 * }); 1483 * 1484 * auto r = new Generator!int( 1485 * { 1486 * foreach (i; 1 .. 10) 1487 * yield(i); 1488 * }); 1489 * 1490 * foreach (e; r) 1491 * { 1492 * tid.send(e); 1493 * } 1494 * } 1495 * --- 1496 */ 1497class Generator(T) : 1498 Fiber, IsGenerator, InputRange!T 1499{ 1500 /** 1501 * Initializes a generator object which is associated with a static 1502 * D function. The function will be called once to prepare the range 1503 * for iteration. 1504 * 1505 * Params: 1506 * fn = The fiber function. 1507 * 1508 * In: 1509 * fn must not be null. 1510 */ 1511 this(void function() fn) 1512 { 1513 super(fn); 1514 call(); 1515 } 1516 1517 /** 1518 * Initializes a generator object which is associated with a static 1519 * D function. The function will be called once to prepare the range 1520 * for iteration. 1521 * 1522 * Params: 1523 * fn = The fiber function. 1524 * sz = The stack size for this fiber. 1525 * 1526 * In: 1527 * fn must not be null. 1528 */ 1529 this(void function() fn, size_t sz) 1530 { 1531 super(fn, sz); 1532 call(); 1533 } 1534 1535 /** 1536 * Initializes a generator object which is associated with a dynamic 1537 * D function. The function will be called once to prepare the range 1538 * for iteration. 1539 * 1540 * Params: 1541 * dg = The fiber function. 1542 * 1543 * In: 1544 * dg must not be null. 1545 */ 1546 this(void delegate() dg) 1547 { 1548 super(dg); 1549 call(); 1550 } 1551 1552 /** 1553 * Initializes a generator object which is associated with a dynamic 1554 * D function. The function will be called once to prepare the range 1555 * for iteration. 1556 * 1557 * Params: 1558 * dg = The fiber function. 1559 * sz = The stack size for this fiber. 1560 * 1561 * In: 1562 * dg must not be null. 1563 */ 1564 this(void delegate() dg, size_t sz) 1565 { 1566 super(dg, sz); 1567 call(); 1568 } 1569 1570 /** 1571 * Returns true if the generator is empty. 1572 */ 1573 final bool empty() @property 1574 { 1575 return m_value is null || state == State.TERM; 1576 } 1577 1578 /** 1579 * Obtains the next value from the underlying function. 1580 */ 1581 final void popFront() 1582 { 1583 call(); 1584 } 1585 1586 /** 1587 * Returns the most recently generated value by shallow copy. 1588 */ 1589 final T front() @property 1590 { 1591 return *m_value; 1592 } 1593 1594 /** 1595 * Returns the most recently generated value without executing a 1596 * copy contructor. Will not compile for element types defining a 1597 * postblit, because Generator does not return by reference. 1598 */ 1599 final T moveFront() 1600 { 1601 static if (!hasElaborateCopyConstructor!T) 1602 { 1603 return front; 1604 } 1605 else 1606 { 1607 static assert(0, 1608 "Fiber front is always rvalue and thus cannot be moved since it defines a postblit."); 1609 } 1610 } 1611 1612 final int opApply(scope int delegate(T) loopBody) 1613 { 1614 int broken; 1615 for (; !empty; popFront()) 1616 { 1617 broken = loopBody(front); 1618 if (broken) break; 1619 } 1620 return broken; 1621 } 1622 1623 final int opApply(scope int delegate(size_t, T) loopBody) 1624 { 1625 int broken; 1626 for (size_t i; !empty; ++i, popFront()) 1627 { 1628 broken = loopBody(i, front); 1629 if (broken) break; 1630 } 1631 return broken; 1632 } 1633private: 1634 T* m_value; 1635} 1636 1637/** 1638 * Yields a value of type T to the caller of the currently executing 1639 * generator. 1640 * 1641 * Params: 1642 * value = The value to yield. 1643 */ 1644void yield(T)(ref T value) 1645{ 1646 Generator!T cur = cast(Generator!T) Fiber.getThis(); 1647 if (cur !is null && cur.state == Fiber.State.EXEC) 1648 { 1649 cur.m_value = &value; 1650 return Fiber.yield(); 1651 } 1652 throw new Exception("yield(T) called with no active generator for the supplied type"); 1653} 1654 1655/// ditto 1656void yield(T)(T value) 1657{ 1658 yield(value); 1659} 1660 1661@system unittest 1662{ 1663 import core.exception; 1664 import std.exception; 1665 1666 static void testScheduler(Scheduler s) 1667 { 1668 scheduler = s; 1669 scheduler.start({ 1670 auto tid = spawn({ 1671 int i; 1672 1673 try 1674 { 1675 for (i = 1; i < 10; i++) 1676 { 1677 assertNotThrown!AssertError(assert(receiveOnly!int() == i)); 1678 } 1679 } 1680 catch (OwnerTerminated e) 1681 { 1682 1683 } 1684 1685 // i will advance 1 past the last value expected 1686 assert(i == 4); 1687 }); 1688 1689 auto r = new Generator!int({ 1690 assertThrown!Exception(yield(2.0)); 1691 yield(); // ensure this is a no-op 1692 yield(1); 1693 yield(); // also once something has been yielded 1694 yield(2); 1695 yield(3); 1696 }); 1697 1698 foreach (e; r) 1699 { 1700 tid.send(e); 1701 } 1702 }); 1703 scheduler = null; 1704 } 1705 1706 testScheduler(new ThreadScheduler); 1707 testScheduler(new FiberScheduler); 1708} 1709/// 1710@system unittest 1711{ 1712 import std.range; 1713 1714 InputRange!int myIota = iota(10).inputRangeObject; 1715 1716 myIota.popFront(); 1717 myIota.popFront(); 1718 assert(myIota.moveFront == 2); 1719 assert(myIota.front == 2); 1720 myIota.popFront(); 1721 assert(myIota.front == 3); 1722 1723 //can be assigned to std.range.interfaces.InputRange directly 1724 myIota = new Generator!int( 1725 { 1726 foreach (i; 0 .. 10) yield(i); 1727 }); 1728 1729 myIota.popFront(); 1730 myIota.popFront(); 1731 assert(myIota.moveFront == 2); 1732 assert(myIota.front == 2); 1733 myIota.popFront(); 1734 assert(myIota.front == 3); 1735 1736 size_t[2] counter = [0, 0]; 1737 foreach (i, unused; myIota) counter[] += [1, i]; 1738 1739 assert(myIota.empty); 1740 assert(counter == [7, 21]); 1741} 1742 1743private 1744{ 1745 /* 1746 * A MessageBox is a message queue for one thread. Other threads may send 1747 * messages to this owner by calling put(), and the owner receives them by 1748 * calling get(). The put() call is therefore effectively shared and the 1749 * get() call is effectively local. setMaxMsgs may be used by any thread 1750 * to limit the size of the message queue. 1751 */ 1752 class MessageBox 1753 { 1754 this() @trusted nothrow /* TODO: make @safe after relevant druntime PR gets merged */ 1755 { 1756 m_lock = new Mutex; 1757 m_closed = false; 1758 1759 if (scheduler is null) 1760 { 1761 m_putMsg = new Condition(m_lock); 1762 m_notFull = new Condition(m_lock); 1763 } 1764 else 1765 { 1766 m_putMsg = scheduler.newCondition(m_lock); 1767 m_notFull = scheduler.newCondition(m_lock); 1768 } 1769 } 1770 1771 /// 1772 final @property bool isClosed() @safe @nogc pure 1773 { 1774 synchronized (m_lock) 1775 { 1776 return m_closed; 1777 } 1778 } 1779 1780 /* 1781 * Sets a limit on the maximum number of user messages allowed in the 1782 * mailbox. If this limit is reached, the caller attempting to add 1783 * a new message will execute call. If num is zero, there is no limit 1784 * on the message queue. 1785 * 1786 * Params: 1787 * num = The maximum size of the queue or zero if the queue is 1788 * unbounded. 1789 * call = The routine to call when the queue is full. 1790 */ 1791 final void setMaxMsgs(size_t num, bool function(Tid) call) @safe @nogc pure 1792 { 1793 synchronized (m_lock) 1794 { 1795 m_maxMsgs = num; 1796 m_onMaxMsgs = call; 1797 } 1798 } 1799 1800 /* 1801 * If maxMsgs is not set, the message is added to the queue and the 1802 * owner is notified. If the queue is full, the message will still be 1803 * accepted if it is a control message, otherwise onCrowdingDoThis is 1804 * called. If the routine returns true, this call will block until 1805 * the owner has made space available in the queue. If it returns 1806 * false, this call will abort. 1807 * 1808 * Params: 1809 * msg = The message to put in the queue. 1810 * 1811 * Throws: 1812 * An exception if the queue is full and onCrowdingDoThis throws. 1813 */ 1814 final void put(ref Message msg) 1815 { 1816 synchronized (m_lock) 1817 { 1818 // TODO: Generate an error here if m_closed is true, or maybe 1819 // put a message in the caller's queue? 1820 if (!m_closed) 1821 { 1822 while (true) 1823 { 1824 if (isPriorityMsg(msg)) 1825 { 1826 m_sharedPty.put(msg); 1827 m_putMsg.notify(); 1828 return; 1829 } 1830 if (!mboxFull() || isControlMsg(msg)) 1831 { 1832 m_sharedBox.put(msg); 1833 m_putMsg.notify(); 1834 return; 1835 } 1836 if (m_onMaxMsgs !is null && !m_onMaxMsgs(thisTid)) 1837 { 1838 return; 1839 } 1840 m_putQueue++; 1841 m_notFull.wait(); 1842 m_putQueue--; 1843 } 1844 } 1845 } 1846 } 1847 1848 /* 1849 * Matches ops against each message in turn until a match is found. 1850 * 1851 * Params: 1852 * ops = The operations to match. Each may return a bool to indicate 1853 * whether a message with a matching type is truly a match. 1854 * 1855 * Returns: 1856 * true if a message was retrieved and false if not (such as if a 1857 * timeout occurred). 1858 * 1859 * Throws: 1860 * LinkTerminated if a linked thread terminated, or OwnerTerminated 1861 * if the owner thread terminates and no existing messages match the 1862 * supplied ops. 1863 */ 1864 bool get(T...)(scope T vals) 1865 { 1866 import std.meta : AliasSeq; 1867 1868 static assert(T.length); 1869 1870 static if (isImplicitlyConvertible!(T[0], Duration)) 1871 { 1872 alias Ops = AliasSeq!(T[1 .. $]); 1873 alias ops = vals[1 .. $]; 1874 enum timedWait = true; 1875 Duration period = vals[0]; 1876 } 1877 else 1878 { 1879 alias Ops = AliasSeq!(T); 1880 alias ops = vals[0 .. $]; 1881 enum timedWait = false; 1882 } 1883 1884 bool onStandardMsg(ref Message msg) 1885 { 1886 foreach (i, t; Ops) 1887 { 1888 alias Args = Parameters!(t); 1889 auto op = ops[i]; 1890 1891 if (msg.convertsTo!(Args)) 1892 { 1893 static if (is(ReturnType!(t) == bool)) 1894 { 1895 return msg.map(op); 1896 } 1897 else 1898 { 1899 msg.map(op); 1900 return true; 1901 } 1902 } 1903 } 1904 return false; 1905 } 1906 1907 bool onLinkDeadMsg(ref Message msg) 1908 { 1909 assert(msg.convertsTo!(Tid)); 1910 auto tid = msg.get!(Tid); 1911 1912 if (bool* pDepends = tid in thisInfo.links) 1913 { 1914 auto depends = *pDepends; 1915 thisInfo.links.remove(tid); 1916 // Give the owner relationship precedence. 1917 if (depends && tid != thisInfo.owner) 1918 { 1919 auto e = new LinkTerminated(tid); 1920 auto m = Message(MsgType.standard, e); 1921 if (onStandardMsg(m)) 1922 return true; 1923 throw e; 1924 } 1925 } 1926 if (tid == thisInfo.owner) 1927 { 1928 thisInfo.owner = Tid.init; 1929 auto e = new OwnerTerminated(tid); 1930 auto m = Message(MsgType.standard, e); 1931 if (onStandardMsg(m)) 1932 return true; 1933 throw e; 1934 } 1935 return false; 1936 } 1937 1938 bool onControlMsg(ref Message msg) 1939 { 1940 switch (msg.type) 1941 { 1942 case MsgType.linkDead: 1943 return onLinkDeadMsg(msg); 1944 default: 1945 return false; 1946 } 1947 } 1948 1949 bool scan(ref ListT list) 1950 { 1951 for (auto range = list[]; !range.empty;) 1952 { 1953 // Only the message handler will throw, so if this occurs 1954 // we can be certain that the message was handled. 1955 scope (failure) 1956 list.removeAt(range); 1957 1958 if (isControlMsg(range.front)) 1959 { 1960 if (onControlMsg(range.front)) 1961 { 1962 // Although the linkDead message is a control message, 1963 // it can be handled by the user. Since the linkDead 1964 // message throws if not handled, if we get here then 1965 // it has been handled and we can return from receive. 1966 // This is a weird special case that will have to be 1967 // handled in a more general way if more are added. 1968 if (!isLinkDeadMsg(range.front)) 1969 { 1970 list.removeAt(range); 1971 continue; 1972 } 1973 list.removeAt(range); 1974 return true; 1975 } 1976 range.popFront(); 1977 continue; 1978 } 1979 else 1980 { 1981 if (onStandardMsg(range.front)) 1982 { 1983 list.removeAt(range); 1984 return true; 1985 } 1986 range.popFront(); 1987 continue; 1988 } 1989 } 1990 return false; 1991 } 1992 1993 bool pty(ref ListT list) 1994 { 1995 if (!list.empty) 1996 { 1997 auto range = list[]; 1998 1999 if (onStandardMsg(range.front)) 2000 { 2001 list.removeAt(range); 2002 return true; 2003 } 2004 if (range.front.convertsTo!(Throwable)) 2005 throw range.front.get!(Throwable); 2006 else if (range.front.convertsTo!(shared(Throwable))) 2007 throw range.front.get!(shared(Throwable)); 2008 else 2009 throw new PriorityMessageException(range.front.data); 2010 } 2011 return false; 2012 } 2013 2014 static if (timedWait) 2015 { 2016 import core.time : MonoTime; 2017 auto limit = MonoTime.currTime + period; 2018 } 2019 2020 while (true) 2021 { 2022 ListT arrived; 2023 2024 if (pty(m_localPty) || scan(m_localBox)) 2025 { 2026 return true; 2027 } 2028 yield(); 2029 synchronized (m_lock) 2030 { 2031 updateMsgCount(); 2032 while (m_sharedPty.empty && m_sharedBox.empty) 2033 { 2034 // NOTE: We're notifying all waiters here instead of just 2035 // a few because the onCrowding behavior may have 2036 // changed and we don't want to block sender threads 2037 // unnecessarily if the new behavior is not to block. 2038 // This will admittedly result in spurious wakeups 2039 // in other situations, but what can you do? 2040 if (m_putQueue && !mboxFull()) 2041 m_notFull.notifyAll(); 2042 static if (timedWait) 2043 { 2044 if (period <= Duration.zero || !m_putMsg.wait(period)) 2045 return false; 2046 } 2047 else 2048 { 2049 m_putMsg.wait(); 2050 } 2051 } 2052 m_localPty.put(m_sharedPty); 2053 arrived.put(m_sharedBox); 2054 } 2055 if (m_localPty.empty) 2056 { 2057 scope (exit) m_localBox.put(arrived); 2058 if (scan(arrived)) 2059 { 2060 return true; 2061 } 2062 else 2063 { 2064 static if (timedWait) 2065 { 2066 period = limit - MonoTime.currTime; 2067 } 2068 continue; 2069 } 2070 } 2071 m_localBox.put(arrived); 2072 pty(m_localPty); 2073 return true; 2074 } 2075 } 2076 2077 /* 2078 * Called on thread termination. This routine processes any remaining 2079 * control messages, clears out message queues, and sets a flag to 2080 * reject any future messages. 2081 */ 2082 final void close() 2083 { 2084 static void onLinkDeadMsg(ref Message msg) 2085 { 2086 assert(msg.convertsTo!(Tid)); 2087 auto tid = msg.get!(Tid); 2088 2089 thisInfo.links.remove(tid); 2090 if (tid == thisInfo.owner) 2091 thisInfo.owner = Tid.init; 2092 } 2093 2094 static void sweep(ref ListT list) 2095 { 2096 for (auto range = list[]; !range.empty; range.popFront()) 2097 { 2098 if (range.front.type == MsgType.linkDead) 2099 onLinkDeadMsg(range.front); 2100 } 2101 } 2102 2103 ListT arrived; 2104 2105 sweep(m_localBox); 2106 synchronized (m_lock) 2107 { 2108 arrived.put(m_sharedBox); 2109 m_closed = true; 2110 } 2111 m_localBox.clear(); 2112 sweep(arrived); 2113 } 2114 2115 private: 2116 // Routines involving local data only, no lock needed. 2117 2118 bool mboxFull() @safe @nogc pure nothrow 2119 { 2120 return m_maxMsgs && m_maxMsgs <= m_localMsgs + m_sharedBox.length; 2121 } 2122 2123 void updateMsgCount() @safe @nogc pure nothrow 2124 { 2125 m_localMsgs = m_localBox.length; 2126 } 2127 2128 bool isControlMsg(ref Message msg) @safe @nogc pure nothrow 2129 { 2130 return msg.type != MsgType.standard && msg.type != MsgType.priority; 2131 } 2132 2133 bool isPriorityMsg(ref Message msg) @safe @nogc pure nothrow 2134 { 2135 return msg.type == MsgType.priority; 2136 } 2137 2138 bool isLinkDeadMsg(ref Message msg) @safe @nogc pure nothrow 2139 { 2140 return msg.type == MsgType.linkDead; 2141 } 2142 2143 alias OnMaxFn = bool function(Tid); 2144 alias ListT = List!(Message); 2145 2146 ListT m_localBox; 2147 ListT m_localPty; 2148 2149 Mutex m_lock; 2150 Condition m_putMsg; 2151 Condition m_notFull; 2152 size_t m_putQueue; 2153 ListT m_sharedBox; 2154 ListT m_sharedPty; 2155 OnMaxFn m_onMaxMsgs; 2156 size_t m_localMsgs; 2157 size_t m_maxMsgs; 2158 bool m_closed; 2159 } 2160 2161 /* 2162 * 2163 */ 2164 struct List(T) 2165 { 2166 struct Range 2167 { 2168 import std.exception : enforce; 2169 2170 @property bool empty() const 2171 { 2172 return !m_prev.next; 2173 } 2174 2175 @property ref T front() 2176 { 2177 enforce(m_prev.next, "invalid list node"); 2178 return m_prev.next.val; 2179 } 2180 2181 @property void front(T val) 2182 { 2183 enforce(m_prev.next, "invalid list node"); 2184 m_prev.next.val = val; 2185 } 2186 2187 void popFront() 2188 { 2189 enforce(m_prev.next, "invalid list node"); 2190 m_prev = m_prev.next; 2191 } 2192 2193 private this(Node* p) 2194 { 2195 m_prev = p; 2196 } 2197 2198 private Node* m_prev; 2199 } 2200 2201 void put(T val) 2202 { 2203 put(newNode(val)); 2204 } 2205 2206 void put(ref List!(T) rhs) 2207 { 2208 if (!rhs.empty) 2209 { 2210 put(rhs.m_first); 2211 while (m_last.next !is null) 2212 { 2213 m_last = m_last.next; 2214 m_count++; 2215 } 2216 rhs.m_first = null; 2217 rhs.m_last = null; 2218 rhs.m_count = 0; 2219 } 2220 } 2221 2222 Range opSlice() 2223 { 2224 return Range(cast(Node*)&m_first); 2225 } 2226 2227 void removeAt(Range r) 2228 { 2229 import std.exception : enforce; 2230 2231 assert(m_count); 2232 Node* n = r.m_prev; 2233 enforce(n && n.next, "attempting to remove invalid list node"); 2234 2235 if (m_last is m_first) 2236 m_last = null; 2237 else if (m_last is n.next) 2238 m_last = n; // nocoverage 2239 Node* to_free = n.next; 2240 n.next = n.next.next; 2241 freeNode(to_free); 2242 m_count--; 2243 } 2244 2245 @property size_t length() 2246 { 2247 return m_count; 2248 } 2249 2250 void clear() 2251 { 2252 m_first = m_last = null; 2253 m_count = 0; 2254 } 2255 2256 @property bool empty() 2257 { 2258 return m_first is null; 2259 } 2260 2261 private: 2262 struct Node 2263 { 2264 Node* next; 2265 T val; 2266 2267 this(T v) 2268 { 2269 val = v; 2270 } 2271 } 2272 2273 static shared struct SpinLock 2274 { 2275 void lock() { while (!cas(&locked, false, true)) { Thread.yield(); } } 2276 void unlock() { atomicStore!(MemoryOrder.rel)(locked, false); } 2277 bool locked; 2278 } 2279 2280 static shared SpinLock sm_lock; 2281 static shared Node* sm_head; 2282 2283 Node* newNode(T v) 2284 { 2285 Node* n; 2286 { 2287 sm_lock.lock(); 2288 scope (exit) sm_lock.unlock(); 2289 2290 if (sm_head) 2291 { 2292 n = cast(Node*) sm_head; 2293 sm_head = sm_head.next; 2294 } 2295 } 2296 if (n) 2297 { 2298 import std.conv : emplace; 2299 emplace!Node(n, v); 2300 } 2301 else 2302 { 2303 n = new Node(v); 2304 } 2305 return n; 2306 } 2307 2308 void freeNode(Node* n) 2309 { 2310 // destroy val to free any owned GC memory 2311 destroy(n.val); 2312 2313 sm_lock.lock(); 2314 scope (exit) sm_lock.unlock(); 2315 2316 auto sn = cast(shared(Node)*) n; 2317 sn.next = sm_head; 2318 sm_head = sn; 2319 } 2320 2321 void put(Node* n) 2322 { 2323 m_count++; 2324 if (!empty) 2325 { 2326 m_last.next = n; 2327 m_last = n; 2328 return; 2329 } 2330 m_first = n; 2331 m_last = n; 2332 } 2333 2334 Node* m_first; 2335 Node* m_last; 2336 size_t m_count; 2337 } 2338} 2339 2340version (unittest) 2341{ 2342 import std.stdio; 2343 import std.typecons : tuple, Tuple; 2344 2345 void testfn(Tid tid) 2346 { 2347 receive((float val) { assert(0); }, (int val, int val2) { 2348 assert(val == 42 && val2 == 86); 2349 }); 2350 receive((Tuple!(int, int) val) { assert(val[0] == 42 && val[1] == 86); }); 2351 receive((Variant val) { }); 2352 receive((string val) { 2353 if ("the quick brown fox" != val) 2354 return false; 2355 return true; 2356 }, (string val) { assert(false); }); 2357 prioritySend(tid, "done"); 2358 } 2359 2360 void runTest(Tid tid) 2361 { 2362 send(tid, 42, 86); 2363 send(tid, tuple(42, 86)); 2364 send(tid, "hello", "there"); 2365 send(tid, "the quick brown fox"); 2366 receive((string val) { assert(val == "done"); }); 2367 } 2368 2369 void simpleTest() 2370 { 2371 auto tid = spawn(&testfn, thisTid); 2372 runTest(tid); 2373 2374 // Run the test again with a limited mailbox size. 2375 tid = spawn(&testfn, thisTid); 2376 setMaxMailboxSize(tid, 2, OnCrowding.block); 2377 runTest(tid); 2378 } 2379 2380 @system unittest 2381 { 2382 simpleTest(); 2383 } 2384 2385 @system unittest 2386 { 2387 scheduler = new ThreadScheduler; 2388 simpleTest(); 2389 scheduler = null; 2390 } 2391} 2392 2393private @property Mutex initOnceLock() 2394{ 2395 __gshared Mutex lock; 2396 if (auto mtx = cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock)) 2397 return mtx; 2398 auto mtx = new Mutex; 2399 if (cas(cast(shared)&lock, cast(shared) null, cast(shared) mtx)) 2400 return mtx; 2401 return cast() atomicLoad!(MemoryOrder.acq)(*cast(shared)&lock); 2402} 2403 2404/** 2405 * Initializes $(D_PARAM var) with the lazy $(D_PARAM init) value in a 2406 * thread-safe manner. 2407 * 2408 * The implementation guarantees that all threads simultaneously calling 2409 * initOnce with the same $(D_PARAM var) argument block until $(D_PARAM var) is 2410 * fully initialized. All side-effects of $(D_PARAM init) are globally visible 2411 * afterwards. 2412 * 2413 * Params: 2414 * var = The variable to initialize 2415 * init = The lazy initializer value 2416 * 2417 * Returns: 2418 * A reference to the initialized variable 2419 */ 2420auto ref initOnce(alias var)(lazy typeof(var) init) 2421{ 2422 return initOnce!var(init, initOnceLock); 2423} 2424 2425/// A typical use-case is to perform lazy but thread-safe initialization. 2426@system unittest 2427{ 2428 static class MySingleton 2429 { 2430 static MySingleton instance() 2431 { 2432 static __gshared MySingleton inst; 2433 return initOnce!inst(new MySingleton); 2434 } 2435 } 2436 2437 assert(MySingleton.instance !is null); 2438} 2439 2440@system unittest 2441{ 2442 static class MySingleton 2443 { 2444 static MySingleton instance() 2445 { 2446 static __gshared MySingleton inst; 2447 return initOnce!inst(new MySingleton); 2448 } 2449 2450 private: 2451 this() { val = ++cnt; } 2452 size_t val; 2453 static __gshared size_t cnt; 2454 } 2455 2456 foreach (_; 0 .. 10) 2457 spawn({ ownerTid.send(MySingleton.instance.val); }); 2458 foreach (_; 0 .. 10) 2459 assert(receiveOnly!size_t == MySingleton.instance.val); 2460 assert(MySingleton.cnt == 1); 2461} 2462 2463/** 2464 * Same as above, but takes a separate mutex instead of sharing one among 2465 * all initOnce instances. 2466 * 2467 * This should be used to avoid dead-locks when the $(D_PARAM init) 2468 * expression waits for the result of another thread that might also 2469 * call initOnce. Use with care. 2470 * 2471 * Params: 2472 * var = The variable to initialize 2473 * init = The lazy initializer value 2474 * mutex = A mutex to prevent race conditions 2475 * 2476 * Returns: 2477 * A reference to the initialized variable 2478 */ 2479auto ref initOnce(alias var)(lazy typeof(var) init, Mutex mutex) 2480{ 2481 // check that var is global, can't take address of a TLS variable 2482 static assert(is(typeof({ __gshared p = &var; })), 2483 "var must be 'static shared' or '__gshared'."); 2484 import core.atomic : atomicLoad, MemoryOrder, atomicStore; 2485 2486 static shared bool flag; 2487 if (!atomicLoad!(MemoryOrder.acq)(flag)) 2488 { 2489 synchronized (mutex) 2490 { 2491 if (!atomicLoad!(MemoryOrder.acq)(flag)) 2492 { 2493 var = init; 2494 atomicStore!(MemoryOrder.rel)(flag, true); 2495 } 2496 } 2497 } 2498 return var; 2499} 2500 2501/// Use a separate mutex when init blocks on another thread that might also call initOnce. 2502@system unittest 2503{ 2504 import core.sync.mutex : Mutex; 2505 2506 static shared bool varA, varB; 2507 __gshared Mutex m; 2508 m = new Mutex; 2509 2510 spawn({ 2511 // use a different mutex for varB to avoid a dead-lock 2512 initOnce!varB(true, m); 2513 ownerTid.send(true); 2514 }); 2515 // init depends on the result of the spawned thread 2516 initOnce!varA(receiveOnly!bool); 2517 assert(varA == true); 2518 assert(varB == true); 2519} 2520 2521@system unittest 2522{ 2523 static shared bool a; 2524 __gshared bool b; 2525 static bool c; 2526 bool d; 2527 initOnce!a(true); 2528 initOnce!b(true); 2529 static assert(!__traits(compiles, initOnce!c(true))); // TLS 2530 static assert(!__traits(compiles, initOnce!d(true))); // local variable 2531} 2532