Ticket #8007: 0001-Reduce-lock-contention-in-kernel-port-subsystem.patch
File 0001-Reduce-lock-contention-in-kernel-port-subsystem.patch, 38.5 KB (added by , 11 years ago) |
---|
-
headers/private/kernel/port.h
From 35b31927a9f8741fc6b4f07af3d7577c6af7bb24 Mon Sep 17 00:00:00 2001 From: Julian Harnath <github@orangejua.de> Date: Sun, 6 Oct 2013 23:34:37 +0200 Subject: [PATCH 1/2] Reduce lock contention in kernel port subsystem. * Replace ports list mutex with R/W-lock. * Move team port list protection to separate array of mutexes. Relieve contention on sPortsLock by removing Team::port_list from its protected items. With this, set_port_owner() only needs to acquire the sPortsLock for reading. * Add another hash table holding the ports by name. Used by find_port() so it doesn't have to iterate over the list anymore. * Use slab-based memory allocator for port messages. sPortQuotaLock was acquired on every message send or receive and was thus another point of contention. The lock is not necessary anymore. * Lock for port hashes and Port::lock are no longer locked in a nested fashion to reduce chances of blocking other threads. * Make operations concurrency-safe by adding an atomically accessed Port::state which provides linearization points to port creation and deletion. Both operations are now divided into logical and physical parts, the logical part just updating the state and the physical part adding/remove it to/from the port hash and team port list. * set_port_owner() is the only remaining function which still locks Port::lock and one or two of sTeamListLock[] in a nested fashion. Since it needs to move the port from one team list to another and change Port::owner, there's no way around. * Ports are now reference counted to make accesses to already-deleted ports safe. --- headers/private/kernel/port.h | 2 + src/system/kernel/port.cpp | 748 +++++++++++++++++++++++++++--------------- src/system/kernel/team.cpp | 2 +- 3 files changed, 478 insertions(+), 274 deletions(-) diff --git a/headers/private/kernel/port.h b/headers/private/kernel/port.h index 3237ea5..529fb5d 100644
a b void delete_owned_ports(Team* team); 36 36 int32 port_max_ports(void); 37 37 int32 port_used_ports(void); 38 38 39 size_t port_team_link_offset(); 40 39 41 status_t select_port(int32 object, struct select_info *info, bool kernel); 40 42 status_t deselect_port(int32 object, struct select_info *info, bool kernel); 41 43 -
src/system/kernel/port.cpp
diff --git a/src/system/kernel/port.cpp b/src/system/kernel/port.cpp index e4ba96c..249df45 100644
a b 44 44 #endif 45 45 46 46 47 #if __GNUC__ >= 3 48 # define GCC_2_NRV(x) 49 // GCC >= 3.1 doesn't need it anymore 50 #else 51 # define GCC_2_NRV(x) return x; 52 // GCC 2 named return value syntax 53 // see http://gcc.gnu.org/onlinedocs/gcc-2.95.2/gcc_5.html#SEC106 54 #endif 55 56 47 57 // Locking: 48 // * sPortsLock: Protects the sPorts hash table, Team::port_list, and 49 // Port::owner. 50 // * Port::lock: Protects all Port members save team_link, hash_link, and lock. 51 // id is immutable. 52 // * sPortQuotaLock: Protects sTotalSpaceInUse, sAreaChangeCounter, 53 // sWaitingForSpace and the critical section of creating/adding areas for the 54 // port heap in the grow case. It also has to be held when reading 55 // sWaitingForSpace to determine whether or not to notify the 56 // sNoSpaceCondition condition variable. 58 // * sPortsLock: Protects the sPorts and sPortsByName hash tables. 59 // * sTeamListLock[]: Protects Team::port_list. Lock index for given team is 60 // (Team::id % kTeamListLockCount). 61 // * Port::lock: Protects all Port members save team_link, hash_link, lock and 62 // state. id is immutable. 57 63 // 58 // The locking order is sPortsLock -> Port::lock. A port must be looked up 59 // in sPorts and locked with sPortsLock held. Afterwards sPortsLock can be 60 // dropped, unless any field guarded by sPortsLock is accessed. 64 // Port::state ensures atomicity by providing a linearization point for adding 65 // and removing ports to the hash tables and the team port list. 66 // * sPortsLock and sTeamListLock[] are locked separately and not in a nested 67 // fashion, so a port can be in the hash table but not in the team port list 68 // or vice versa. => Without further provisions, insertion and removal are 69 // not linearizable and thus not concurrency-safe. 70 // * To make insertion and removal linearizable, Port::state was added. It is 71 // always only accessed atomically and updates are done using 72 // atomic_test_and_set(). A port is only seen as existent when its state is 73 // Port::kActive. 74 // * Deletion of ports is done in two steps: logical and physical deletion. 75 // First, logical deletion happens and sets Port::state to Port::kDeleted. 76 // This is an atomic operation and from then on, functions like 77 // get_locked_port() consider this port as deleted and ignore it. Secondly, 78 // physical deletion removes the port from hash tables and team port list. 79 // In a similar way, port creation first inserts into hashes and team list 80 // and only then sets port to Port::kActive. 81 // This creates a linearization point at the atomic update of Port::state, 82 // operations become linearizable and thus concurrency-safe. To help 83 // understanding, the linearization points are annotated with comments. 84 // * Ports are reference-counted so it's not a problem when someone still 85 // has a reference to a deleted port. 61 86 62 87 63 88 struct port_message; … … struct port_message : DoublyLinkedListLinkImpl<port_message> { 78 103 typedef DoublyLinkedList<port_message> MessageList; 79 104 80 105 81 struct Port { 106 struct Port : public KernelReferenceable { 107 enum State { 108 kUnused = 0, 109 kActive, 110 kDeleted 111 }; 112 82 113 struct list_link team_link; 83 114 Port* hash_link; 84 115 port_id id; 85 116 team_id owner; 117 Port* name_hash_link; 118 size_t name_hash; 86 119 int32 capacity; 87 120 mutex lock; 121 vint32 state; 88 122 uint32 read_count; 89 123 int32 write_count; 90 124 ConditionVariable read_condition; … … struct Port { 97 131 Port(team_id owner, int32 queueLength, char* name) 98 132 : 99 133 owner(owner), 134 name_hash(0), 100 135 capacity(queueLength), 136 state(kUnused), 101 137 read_count(0), 102 138 write_count(queueLength), 103 139 total_count(0), … … struct Port { 110 146 write_condition.Init(this, "port write"); 111 147 } 112 148 113 ~Port()149 virtual ~Port() 114 150 { 115 151 while (port_message* message = messages.RemoveHead()) 116 152 put_port_message(message); … … struct PortHashDefinition { 149 185 typedef BOpenHashTable<PortHashDefinition> PortHashTable; 150 186 151 187 188 struct PortNameHashDefinition { 189 typedef const char* KeyType; 190 typedef Port ValueType; 191 192 size_t HashKey(const char* key) const 193 { 194 // Hash function: hash(key) = key[0] * 31^(length - 1) 195 // + key[1] * 31^(length - 2) + ... + key[length - 1] 196 197 const size_t length = strlen(key); 198 199 size_t hash = 0; 200 for (size_t index = 0; index < length; index++) 201 hash = 31 * hash + key[index]; 202 203 return hash; 204 } 205 206 size_t Hash(Port* value) const 207 { 208 size_t& hash = value->name_hash; 209 if (hash == 0) 210 hash = HashKey(value->lock.name); 211 return hash; 212 } 213 214 bool Compare(const char* key, Port* value) const 215 { 216 return (strcmp(key, value->lock.name) == 0); 217 } 218 219 Port*& GetLink(Port* value) const 220 { 221 return value->name_hash_link; 222 } 223 }; 224 225 typedef BOpenHashTable<PortNameHashDefinition> PortNameHashTable; 226 227 152 228 class PortNotificationService : public DefaultNotificationService { 153 229 public: 154 230 PortNotificationService(); … … public: 157 233 }; 158 234 159 235 236 // #pragma mark - tracing 237 238 160 239 #if PORT_TRACING 161 240 namespace PortTracing { 162 241 … … static const size_t kBufferGrowRate = kInitialPortBufferSize; 338 417 #define PORT_MAX_MESSAGE_SIZE (256 * 1024) 339 418 340 419 static int32 sMaxPorts = 4096; 341 static int32 sUsedPorts = 0;420 static vint32 sUsedPorts; 342 421 343 422 static PortHashTable sPorts; 344 static heap_allocator* sPortAllocator;423 static PortNameHashTable sPortsByName; 345 424 static ConditionVariable sNoSpaceCondition; 346 static int32 sTotalSpaceInUse; 347 static int32 sAreaChangeCounter; 348 static int32 sWaitingForSpace; 425 static vint32 sTotalSpaceCommited; 426 static vint32 sWaitingForSpace; 349 427 static port_id sNextPortID = 1; 350 428 static bool sPortsActive = false; 351 static mutex sPortsLock = MUTEX_INITIALIZER("ports list"); 352 static mutex sPortQuotaLock = MUTEX_INITIALIZER("port quota"); 429 static rw_lock sPortsLock = RW_LOCK_INITIALIZER("ports list"); 430 431 enum { 432 kTeamListLockCount = 8 433 }; 434 435 static mutex sTeamListLock[kTeamListLockCount] = { 436 MUTEX_INITIALIZER("team ports list 1"), 437 MUTEX_INITIALIZER("team ports list 2"), 438 MUTEX_INITIALIZER("team ports list 3"), 439 MUTEX_INITIALIZER("team ports list 4"), 440 MUTEX_INITIALIZER("team ports list 5"), 441 MUTEX_INITIALIZER("team ports list 6"), 442 MUTEX_INITIALIZER("team ports list 7"), 443 MUTEX_INITIALIZER("team ports list 8") 444 }; 353 445 354 446 static PortNotificationService sNotificationService; 355 447 … … PortNotificationService::Notify(uint32 opcode, port_id port) 377 469 } 378 470 379 471 380 // #pragma mark - 472 // #pragma mark - debugger commands 381 473 382 474 383 475 static int … … dump_port_info(int argc, char** argv) 463 555 // if the argument looks like a number, treat it as such 464 556 int32 num = parse_expression(argv[1]); 465 557 Port* port = sPorts.Lookup(num); 466 if (port == NULL ) {558 if (port == NULL || port->state != Port::kActive) { 467 559 kprintf("port %" B_PRId32 " (%#" B_PRIx32 ") doesn't exist!\n", 468 560 num, num); 469 561 return 0; … … dump_port_info(int argc, char** argv) 489 581 } 490 582 491 583 584 // #pragma mark - internal helper functions 585 586 492 587 /*! Notifies the port's select events. 493 588 The port must be locked. 494 589 */ … … notify_port_select_events(Port* port, uint16 events) 500 595 } 501 596 502 597 503 static Port*504 get_locked_port(port_id id) 598 static BReference<Port> 599 get_locked_port(port_id id) GCC_2_NRV(portRef) 505 600 { 506 MutexLocker portsLocker(sPortsLock); 601 #if __GNUC__ >= 3 602 BReference<Port> portRef; 603 #endif 604 { 605 ReadLocker portsLocker(sPortsLock); 606 portRef.SetTo(sPorts.Lookup(id)); 607 } 608 609 if (portRef != NULL && portRef->state == Port::kActive) 610 mutex_lock(&portRef->lock); 611 else 612 portRef.Unset(); 507 613 508 Port* port = sPorts.Lookup(id); 509 if (port != NULL) 510 mutex_lock(&port->lock); 511 return port; 614 return portRef; 615 } 616 617 618 static BReference<Port> 619 get_port(port_id id) GCC_2_NRV(portRef) 620 { 621 #if __GNUC__ >= 3 622 BReference<Port> portRef; 623 #endif 624 ReadLocker portsLocker(sPortsLock); 625 portRef.SetTo(sPorts.Lookup(id)); 626 627 return portRef; 512 628 } 513 629 514 630 … … is_port_closed(Port* port) 523 639 static void 524 640 put_port_message(port_message* message) 525 641 { 526 size_t size = sizeof(port_message) + message->size;527 heap_free(sPortAllocator,message);642 const size_t size = sizeof(port_message) + message->size; 643 free(message); 528 644 529 MutexLocker quotaLocker(sPortQuotaLock); 530 sTotalSpaceInUse -= size; 645 atomic_add(&sTotalSpaceCommited, -size); 531 646 if (sWaitingForSpace > 0) 532 647 sNoSpaceCondition.NotifyAll(); 533 648 } 534 649 535 650 651 /*! Port must be locked. */ 536 652 static status_t 537 653 get_port_message(int32 code, size_t bufferSize, uint32 flags, bigtime_t timeout, 538 654 port_message** _message, Port& port) 539 655 { 540 size_t size = sizeof(port_message) + bufferSize; 541 bool needToWait = false; 542 543 MutexLocker quotaLocker(sPortQuotaLock); 656 const size_t size = sizeof(port_message) + bufferSize; 544 657 545 658 while (true) { 546 while (sTotalSpaceInUse + size > kTotalSpaceLimit || needToWait) { 659 int32 previouslyCommited = atomic_add(&sTotalSpaceCommited, size); 660 661 while (previouslyCommited + size > kTotalSpaceLimit) { 547 662 // TODO: add per team limit 548 // We are not allowed to create another heap area, as our 663 664 // We are not allowed to allocate more memory, as our 549 665 // space limit has been reached - just wait until we get 550 666 // some free space again. 551 667 668 atomic_add(&sTotalSpaceCommited, -size); 669 552 670 // TODO: we don't want to wait - but does that also mean we 553 // shouldn't wait for the area creation?671 // shouldn't wait for free memory? 554 672 if ((flags & B_RELATIVE_TIMEOUT) != 0 && timeout <= 0) 555 673 return B_WOULD_BLOCK; 556 674 557 675 ConditionVariableEntry entry; 558 676 sNoSpaceCondition.Add(&entry); 559 677 560 sWaitingForSpace++;561 quotaLocker.Unlock();562 563 678 port_id portID = port.id; 564 679 mutex_unlock(&port.lock); 565 680 681 atomic_add(&sWaitingForSpace, 1); 682 683 // TODO: right here the condition could be notified and we'd 684 // miss it. 685 566 686 status_t status = entry.Wait(flags, timeout); 567 687 568 // re-lock the port and the quota569 Port* newPort = get_locked_port(portID); 570 quotaLocker.Lock();571 sWaitingForSpace--;688 atomic_add(&sWaitingForSpace, -1); 689 690 // re-lock the port 691 BReference<Port> newPortRef = get_locked_port(portID); 572 692 573 if (newPort != &port || is_port_closed(&port)) {693 if (newPortRef.Get() != &port || is_port_closed(&port)) { 574 694 // the port is no longer usable 575 695 return B_BAD_PORT_ID; 576 696 } … … get_port_message(int32 code, size_t bufferSize, uint32 flags, bigtime_t timeout, 578 698 if (status == B_TIMED_OUT) 579 699 return B_TIMED_OUT; 580 700 581 needToWait = false;701 previouslyCommited = atomic_add(&sTotalSpaceCommited, size); 582 702 continue; 583 703 } 584 704 585 int32 areaChangeCounter = sAreaChangeCounter;586 sTotalSpaceInUse += size;587 quotaLocker.Unlock();588 589 705 // Quota is fulfilled, try to allocate the buffer 590 591 port_message* message 592 = (port_message*)heap_memalign(sPortAllocator, 0, size); 706 port_message* message = (port_message*)malloc(size); 593 707 if (message != NULL) { 594 708 message->code = code; 595 709 message->size = bufferSize; … … get_port_message(int32 code, size_t bufferSize, uint32 flags, bigtime_t timeout, 598 712 return B_OK; 599 713 } 600 714 601 quotaLocker.Lock(); 602 603 // We weren't able to allocate and we'll start over, including 604 // re-acquireing the quota, so we remove our size from the in-use 605 // counter again. 606 sTotalSpaceInUse -= size; 607 608 if (areaChangeCounter != sAreaChangeCounter) { 609 // There was already an area added since we tried allocating, 610 // start over. 611 continue; 612 } 613 614 // Create a new area for the heap to use 615 616 addr_t base; 617 area_id area = create_area("port grown buffer", (void**)&base, 618 B_ANY_KERNEL_ADDRESS, kBufferGrowRate, B_NO_LOCK, 619 B_KERNEL_READ_AREA | B_KERNEL_WRITE_AREA); 620 if (area < 0) { 621 // We'll have to get by with what we have, so wait for someone 622 // to free a message instead. We enforce waiting so that we don't 623 // try to create a new area over and over. 624 needToWait = true; 625 continue; 626 } 627 628 heap_add_area(sPortAllocator, area, base, kBufferGrowRate); 629 630 sAreaChangeCounter++; 631 if (sWaitingForSpace > 0) 632 sNoSpaceCondition.NotifyAll(); 715 // We weren't able to allocate and we'll start over,so we remove our 716 // size from the commited-counter again. 717 atomic_add(&sTotalSpaceCommited, -size); 718 continue; 633 719 } 634 720 } 635 721 … … copy_port_message(port_message* message, int32* _code, void* buffer, 677 763 678 764 679 765 static void 680 uninit_port _locked(Port* port)766 uninit_port(Port* port) 681 767 { 768 MutexLocker locker(port->lock); 769 682 770 notify_port_select_events(port, B_EVENT_INVALID); 683 771 port->select_infos = NULL; 684 772 … … uninit_port_locked(Port* port) 690 778 } 691 779 692 780 781 /*! Caller must ensure there is still a reference to the port. (Either by 782 * holding a reference itself or by holding a lock on one of the data 783 * structures in which it is referenced.) 784 */ 785 static status_t 786 delete_port_logical(Port* port) 787 { 788 for (;;) { 789 // Try to logically delete 790 const int32 oldState = atomic_test_and_set(&port->state, 791 Port::kDeleted, Port::kActive); 792 // Linearization point for port deletion 793 794 switch (oldState) { 795 case Port::kActive: 796 // Logical deletion succesful 797 return B_OK; 798 799 case Port::kDeleted: 800 // Someone else already deleted it in the meantime 801 TRACE(("delete_port_logical: already deleted port_id %ld\n", 802 port->id)); 803 return B_BAD_PORT_ID; 804 805 case Port::kUnused: 806 // Port is still being created, retry 807 continue; 808 809 default: 810 // Port state got corrupted somehow 811 panic("Invalid port state!\n"); 812 } 813 } 814 } 815 816 693 817 // #pragma mark - private kernel API 694 818 695 819 … … delete_owned_ports(Team* team) 700 824 { 701 825 TRACE(("delete_owned_ports(owner = %ld)\n", team->id)); 702 826 703 MutexLocker portsLocker(sPortsLock); 827 list deletionList; 828 list_init_etc(&deletionList, port_team_link_offset()); 704 829 705 // move the ports from the team's port list to a local list 706 struct list queue; 707 list_move_to_list(&team->port_list, &queue); 830 const uint8 lockIndex = team->id % kTeamListLockCount; 831 MutexLocker teamPortsListLocker(sTeamListLock[lockIndex]); 708 832 709 // iterate through the list or ports, remove them from the hash table and710 // uninitialize them711 Port* port = (Port*)list_get_first_item(& queue);833 // Try to logically delete all ports from the team's port list. 834 // On success, move the port to deletionList. 835 Port* port = (Port*)list_get_first_item(&team->port_list); 712 836 while (port != NULL) { 713 MutexLocker locker(port->lock); 714 sPorts.Remove(port); 715 uninit_port_locked(port); 716 sUsedPorts--; 717 718 port = (Port*)list_get_next_item(&queue, port); 837 status_t status = delete_port_logical(port); 838 // Contains linearization point 839 840 Port* nextPort = (Port*)list_get_next_item(&team->port_list, port); 841 842 if (status == B_OK) { 843 list_remove_link(&port->team_link); 844 list_add_item(&deletionList, port); 845 } 846 847 port = nextPort; 719 848 } 720 849 721 portsLocker.Unlock();850 teamPortsListLocker.Unlock(); 722 851 723 // delete the ports 724 while (Port* port = (Port*)list_remove_head_item(&queue)) 725 delete port; 852 // Remove all ports in deletionList from hashes 853 { 854 WriteLocker portsLocker(sPortsLock); 855 856 for (Port* port = (Port*)list_get_first_item(&deletionList); 857 port != NULL; 858 port = (Port*)list_get_next_item(&deletionList, port)) { 859 860 sPorts.Remove(port); 861 sPortsByName.Remove(port); 862 port->ReleaseReference(); 863 // joint reference for sPorts and sPortsByName 864 } 865 } 866 867 // Uninitialize ports and release team port list references 868 while (Port* port = (Port*)list_remove_head_item(&deletionList)) { 869 atomic_add(&sUsedPorts, -1); 870 uninit_port(port); 871 port->ReleaseReference(); 872 // Reference for team port list 873 } 726 874 } 727 875 728 876 … … port_used_ports(void) 740 888 } 741 889 742 890 891 size_t 892 port_team_link_offset() 893 { 894 // Somewhat ugly workaround since we cannot use offsetof() for a class 895 // with vtable (GCC4 throws a warning then). 896 Port* port = (Port*)0; 897 return (size_t)&port->team_link; 898 } 899 900 743 901 status_t 744 902 port_init(kernel_args *args) 745 903 { 746 // initialize ports table 904 // initialize ports table and by-name hash 747 905 new(&sPorts) PortHashTable; 748 906 if (sPorts.Init() != B_OK) { 749 907 panic("Failed to init port hash table!"); 750 908 return B_NO_MEMORY; 751 909 } 752 910 753 addr_t base; 754 if (create_area("port heap", (void**)&base, B_ANY_KERNEL_ADDRESS, 755 kInitialPortBufferSize, B_NO_LOCK, 756 B_KERNEL_READ_AREA | B_KERNEL_WRITE_AREA) < 0) { 757 // TODO: Since port_init() is invoked before the boot partition is 758 // mounted, the underlying VMAnonymousCache cannot commit swap space 759 // upon creation and thus the pages aren't swappable after all. This 760 // makes the area essentially B_LAZY_LOCK with additional overhead. 761 panic("unable to allocate port area!\n"); 762 return B_ERROR; 763 } 764 765 static const heap_class kBufferHeapClass = { "port heap", 100, 766 PORT_MAX_MESSAGE_SIZE + sizeof(port_message), 2 * 1024, 767 sizeof(port_message), 4, 2, 24 }; 768 sPortAllocator = heap_create_allocator("port buffer", base, 769 kInitialPortBufferSize, &kBufferHeapClass, true); 770 if (sPortAllocator == NULL) { 771 panic("unable to create port heap"); 911 new(&sPortsByName) PortNameHashTable; 912 if (sPortsByName.Init() != B_OK) { 913 panic("Failed to init port by name hash table!"); 772 914 return B_NO_MEMORY; 773 915 } 774 916 … … create_port(int32 queueLength, const char* name) 830 972 free(nameBuffer); 831 973 return B_NO_MEMORY; 832 974 } 833 ObjectDeleter<Port> portDeleter(port);834 835 MutexLocker locker(sPortsLock);836 975 837 976 // check the ports limit 838 if (sUsedPorts >= sMaxPorts) 977 const int32 previouslyUsed = atomic_add(&sUsedPorts, 1); 978 if (previouslyUsed + 1 >= sMaxPorts) { 979 atomic_add(&sUsedPorts, -1); 980 delete port; 839 981 return B_NO_MORE_PORTS; 982 } 840 983 841 sUsedPorts++; 984 { 985 WriteLocker locker(sPortsLock); 842 986 843 // allocate a port ID844 do {845 port->id = sNextPortID++;987 // allocate a port ID 988 do { 989 port->id = sNextPortID++; 846 990 847 // handle integer overflow848 if (sNextPortID < 0)849 sNextPortID = 1;850 } while (sPorts.Lookup(port->id) != NULL);991 // handle integer overflow 992 if (sNextPortID < 0) 993 sNextPortID = 1; 994 } while (sPorts.Lookup(port->id) != NULL); 851 995 852 // insert port in table and team list 853 sPorts.Insert(port); 854 list_add_item(&team->port_list, &port->team_link); 855 portDeleter.Detach(); 996 // Insert port physically: 997 // (1/2) Insert into hash tables 998 port->AcquireReference(); 999 // joint reference for sPorts and sPortsByName 1000 1001 sPorts.Insert(port); 1002 sPortsByName.Insert(port); 1003 } 1004 1005 // (2/2) Insert into team list 1006 { 1007 const uint8 lockIndex = port->owner % kTeamListLockCount; 1008 MutexLocker teamPortsListLocker(sTeamListLock[lockIndex]); 1009 port->AcquireReference(); 1010 list_add_item(&team->port_list, port); 1011 } 856 1012 857 1013 // tracing, notifications, etc. 858 1014 T(Create(port)); 859 1015 860 port_id id = port->id;1016 const port_id id = port->id; 861 1017 862 locker.Unlock(); 1018 // Insert port logically by marking it active 1019 const int32 oldState = atomic_test_and_set(&port->state, 1020 Port::kActive, Port::kUnused); 1021 // Linearization point for port creation 1022 1023 if (oldState != Port::kUnused) { 1024 // Nobody is allowed to tamper with the port before it's active. 1025 panic("Port state was modified during creation!\n"); 1026 } 863 1027 864 1028 TRACE(("create_port() done: port created %ld\n", id)); 865 1029 … … close_port(port_id id) 877 1041 return B_BAD_PORT_ID; 878 1042 879 1043 // get the port 880 Port* port= get_locked_port(id);881 if (port == NULL) {1044 BReference<Port> portRef = get_locked_port(id); 1045 if (portRef == NULL) { 882 1046 TRACE(("close_port: invalid port_id %ld\n", id)); 883 1047 return B_BAD_PORT_ID; 884 1048 } 885 MutexLocker lock(&port ->lock, true);1049 MutexLocker lock(&portRef->lock, true); 886 1050 887 1051 // mark port to disable writing - deleting the semaphores will 888 1052 // wake up waiting read/writes 889 port ->capacity = 0;1053 portRef->capacity = 0; 890 1054 891 notify_port_select_events(port , B_EVENT_INVALID);892 port ->select_infos = NULL;1055 notify_port_select_events(portRef, B_EVENT_INVALID); 1056 portRef->select_infos = NULL; 893 1057 894 port ->read_condition.NotifyAll(false, B_BAD_PORT_ID);895 port ->write_condition.NotifyAll(false, B_BAD_PORT_ID);1058 portRef->read_condition.NotifyAll(false, B_BAD_PORT_ID); 1059 portRef->write_condition.NotifyAll(false, B_BAD_PORT_ID); 896 1060 897 1061 return B_OK; 898 1062 } … … delete_port(port_id id) 906 1070 if (!sPortsActive || id < 0) 907 1071 return B_BAD_PORT_ID; 908 1072 909 // get the port and remove it from the hash table and the team 910 Port* port; 911 MutexLocker locker; 912 { 913 MutexLocker portsLocker(sPortsLock); 1073 BReference<Port> portRef = get_port(id); 914 1074 915 port = sPorts.Lookup(id); 916 if (port == NULL) { 917 TRACE(("delete_port: invalid port_id %ld\n", id)); 918 return B_BAD_PORT_ID; 919 } 1075 if (portRef == NULL) { 1076 TRACE(("delete_port: invalid port_id %ld\n", id)); 1077 return B_BAD_PORT_ID; 1078 } 1079 1080 status_t status = delete_port_logical(portRef); 1081 // Contains linearization point 1082 if (status != B_OK) 1083 return status; 1084 1085 // Now remove port physically: 1086 // (1/2) Remove from hash tables 1087 { 1088 WriteLocker portsLocker(sPortsLock); 920 1089 921 sPorts.Remove(port );922 list_remove_link(&port->team_link);1090 sPorts.Remove(portRef); 1091 sPortsByName.Remove(portRef); 923 1092 924 sUsedPorts--; 1093 portRef->ReleaseReference(); 1094 // joint reference for sPorts and sPortsByName 1095 } 925 1096 926 locker.SetTo(port->lock, false); 1097 // (2/2) Remove from team port list 1098 { 1099 const uint8 lockIndex = portRef->owner % kTeamListLockCount; 1100 MutexLocker teamPortsListLocker(sTeamListLock[lockIndex]); 927 1101 928 uninit_port_locked(port); 1102 list_remove_link(&portRef->team_link); 1103 portRef->ReleaseReference(); 929 1104 } 930 1105 931 T(Delete(port));1106 uninit_port(portRef); 932 1107 933 locker.Unlock();1108 T(Delete(portRef)); 934 1109 935 delete port;1110 atomic_add(&sUsedPorts, -1); 936 1111 937 1112 return B_OK; 938 1113 } … … select_port(int32 id, struct select_info* info, bool kernel) 945 1120 return B_BAD_PORT_ID; 946 1121 947 1122 // get the port 948 Port* port= get_locked_port(id);949 if (port == NULL)1123 BReference<Port> portRef = get_locked_port(id); 1124 if (portRef == NULL) 950 1125 return B_BAD_PORT_ID; 951 MutexLocker locker(port ->lock, true);1126 MutexLocker locker(portRef->lock, true); 952 1127 953 1128 // port must not yet be closed 954 if (is_port_closed(port ))1129 if (is_port_closed(portRef)) 955 1130 return B_BAD_PORT_ID; 956 1131 957 if (!kernel && port ->owner == team_get_kernel_team_id()) {1132 if (!kernel && portRef->owner == team_get_kernel_team_id()) { 958 1133 // kernel port, but call from userland 959 1134 return B_NOT_ALLOWED; 960 1135 } … … select_port(int32 id, struct select_info* info, bool kernel) 964 1139 if (info->selected_events != 0) { 965 1140 uint16 events = 0; 966 1141 967 info->next = port ->select_infos;968 port ->select_infos = info;1142 info->next = portRef->select_infos; 1143 portRef->select_infos = info; 969 1144 970 1145 // check for events 971 1146 if ((info->selected_events & B_EVENT_READ) != 0 972 && !port ->messages.IsEmpty()) {1147 && !portRef->messages.IsEmpty()) { 973 1148 events |= B_EVENT_READ; 974 1149 } 975 1150 976 if (port ->write_count > 0)1151 if (portRef->write_count > 0) 977 1152 events |= B_EVENT_WRITE; 978 1153 979 1154 if (events != 0) … … deselect_port(int32 id, struct select_info* info, bool kernel) 993 1168 return B_OK; 994 1169 995 1170 // get the port 996 Port* port= get_locked_port(id);997 if (port == NULL)1171 BReference<Port> portRef = get_locked_port(id); 1172 if (portRef == NULL) 998 1173 return B_BAD_PORT_ID; 999 MutexLocker locker(port ->lock, true);1174 MutexLocker locker(portRef->lock, true); 1000 1175 1001 1176 // find and remove the infos 1002 select_info** infoLocation = &port ->select_infos;1177 select_info** infoLocation = &portRef->select_infos; 1003 1178 while (*infoLocation != NULL && *infoLocation != info) 1004 1179 infoLocation = &(*infoLocation)->next; 1005 1180 … … find_port(const char* name) 1022 1197 if (name == NULL) 1023 1198 return B_BAD_VALUE; 1024 1199 1025 MutexLocker portsLocker(sPortsLock);1026 1027 for (PortHashTable::Iterator it = sPorts.GetIterator();1028 Port* port = it.Next();) {1029 if (!strcmp(name, port->lock.name))1030 return port->id;1031 }1200 ReadLocker locker(sPortsLock); 1201 Port* port = sPortsByName.Lookup(name); 1202 // Since we have sPortsLock and don't return the port itself, 1203 // no BReference necessary 1204 1205 if (port != NULL && port->state == Port::kActive) 1206 return port->id; 1032 1207 1033 1208 return B_NAME_NOT_FOUND; 1034 1209 } … … _get_port_info(port_id id, port_info* info, size_t size) 1045 1220 return B_BAD_PORT_ID; 1046 1221 1047 1222 // get the port 1048 Port* port= get_locked_port(id);1049 if (port == NULL) {1223 BReference<Port> portRef = get_locked_port(id); 1224 if (portRef == NULL) { 1050 1225 TRACE(("get_port_info: invalid port_id %ld\n", id)); 1051 1226 return B_BAD_PORT_ID; 1052 1227 } 1053 MutexLocker locker(port ->lock, true);1228 MutexLocker locker(portRef->lock, true); 1054 1229 1055 1230 // fill a port_info struct with info 1056 fill_port_info(port , info, size);1231 fill_port_info(portRef, info, size); 1057 1232 return B_OK; 1058 1233 } 1059 1234 … … _get_next_port_info(team_id teamID, int32* _cookie, struct port_info* info, 1077 1252 BReference<Team> teamReference(team, true); 1078 1253 1079 1254 // iterate through the team's port list 1080 MutexLocker portsLocker(sPortsLock); 1255 const uint8 lockIndex = teamID % kTeamListLockCount; 1256 MutexLocker teamPortsListLocker(sTeamListLock[lockIndex]); 1081 1257 1082 1258 int32 stopIndex = *_cookie; 1083 1259 int32 index = 0; … … _get_next_port_info(team_id teamID, int32* _cookie, struct port_info* info, 1097 1273 return B_BAD_PORT_ID; 1098 1274 1099 1275 // fill in the port info 1100 MutexLocker locker(port->lock); 1101 portsLocker.Unlock(); 1102 fill_port_info(port, info, size); 1276 BReference<Port> portRef = port; 1277 teamPortsListLocker.Unlock(); 1278 // Only use portRef below this line... 1279 1280 MutexLocker locker(portRef->lock); 1281 fill_port_info(portRef, info, size); 1103 1282 1104 1283 *_cookie = stopIndex + 1; 1105 1284 return B_OK; … … _get_port_message_info_etc(port_id id, port_message_info* info, 1135 1314 | B_ABSOLUTE_TIMEOUT; 1136 1315 1137 1316 // get the port 1138 Port* port= get_locked_port(id);1139 if (port == NULL)1317 BReference<Port> portRef = get_locked_port(id); 1318 if (portRef == NULL) 1140 1319 return B_BAD_PORT_ID; 1141 MutexLocker locker(port ->lock, true);1320 MutexLocker locker(portRef->lock, true); 1142 1321 1143 if (is_port_closed(port ) && port->messages.IsEmpty()) {1144 T(Info(port , 0, B_BAD_PORT_ID));1322 if (is_port_closed(portRef) && portRef->messages.IsEmpty()) { 1323 T(Info(portRef, 0, B_BAD_PORT_ID)); 1145 1324 TRACE(("_get_port_message_info_etc(): closed port %ld\n", id)); 1146 1325 return B_BAD_PORT_ID; 1147 1326 } 1148 1327 1149 while (port ->read_count == 0) {1328 while (portRef->read_count == 0) { 1150 1329 // We need to wait for a message to appear 1151 1330 if ((flags & B_RELATIVE_TIMEOUT) != 0 && timeout <= 0) 1152 1331 return B_WOULD_BLOCK; 1153 1332 1154 1333 ConditionVariableEntry entry; 1155 port ->read_condition.Add(&entry);1334 portRef->read_condition.Add(&entry); 1156 1335 1157 1336 locker.Unlock(); 1158 1337 … … _get_port_message_info_etc(port_id id, port_message_info* info, 1165 1344 } 1166 1345 1167 1346 // re-lock 1168 Port* newPort= get_locked_port(id);1169 if (newPort == NULL) {1347 BReference<Port> newPortRef = get_locked_port(id); 1348 if (newPortRef == NULL) { 1170 1349 T(Info(id, 0, 0, 0, B_BAD_PORT_ID)); 1171 1350 return B_BAD_PORT_ID; 1172 1351 } 1173 locker.SetTo(newPort ->lock, true);1352 locker.SetTo(newPortRef->lock, true); 1174 1353 1175 if (newPort != port1176 || (is_port_closed(port ) && port->messages.IsEmpty())) {1354 if (newPortRef != portRef 1355 || (is_port_closed(portRef) && portRef->messages.IsEmpty())) { 1177 1356 // the port is no longer there 1178 1357 T(Info(id, 0, 0, 0, B_BAD_PORT_ID)); 1179 1358 return B_BAD_PORT_ID; … … _get_port_message_info_etc(port_id id, port_message_info* info, 1181 1360 } 1182 1361 1183 1362 // determine tail & get the length of the message 1184 port_message* message = port ->messages.Head();1363 port_message* message = portRef->messages.Head(); 1185 1364 if (message == NULL) { 1186 panic("port %" B_PRId32 ": no messages found\n", port ->id);1365 panic("port %" B_PRId32 ": no messages found\n", portRef->id); 1187 1366 return B_ERROR; 1188 1367 } 1189 1368 … … _get_port_message_info_etc(port_id id, port_message_info* info, 1195 1374 T(Info(id, id->read_count, id->write_count, message->code, B_OK)); 1196 1375 1197 1376 // notify next one, as we haven't read from the port 1198 port ->read_condition.NotifyOne();1377 portRef->read_condition.NotifyOne(); 1199 1378 1200 1379 return B_OK; 1201 1380 } … … port_count(port_id id) 1208 1387 return B_BAD_PORT_ID; 1209 1388 1210 1389 // get the port 1211 Port* port= get_locked_port(id);1212 if (port == NULL) {1390 BReference<Port> portRef = get_locked_port(id); 1391 if (portRef == NULL) { 1213 1392 TRACE(("port_count: invalid port_id %ld\n", id)); 1214 1393 return B_BAD_PORT_ID; 1215 1394 } 1216 MutexLocker locker(port ->lock, true);1395 MutexLocker locker(portRef->lock, true); 1217 1396 1218 1397 // return count of messages 1219 return port ->read_count;1398 return portRef->read_count; 1220 1399 } 1221 1400 1222 1401 … … read_port_etc(port_id id, int32* _code, void* buffer, size_t bufferSize, 1244 1423 | B_ABSOLUTE_TIMEOUT; 1245 1424 1246 1425 // get the port 1247 Port* port= get_locked_port(id);1248 if (port == NULL)1426 BReference<Port> portRef = get_locked_port(id); 1427 if (portRef == NULL) 1249 1428 return B_BAD_PORT_ID; 1250 MutexLocker locker(port ->lock, true);1429 MutexLocker locker(portRef->lock, true); 1251 1430 1252 if (is_port_closed(port ) && port->messages.IsEmpty()) {1253 T(Read(port , 0, B_BAD_PORT_ID));1431 if (is_port_closed(portRef) && portRef->messages.IsEmpty()) { 1432 T(Read(portRef, 0, B_BAD_PORT_ID)); 1254 1433 TRACE(("read_port_etc(): closed port %ld\n", id)); 1255 1434 return B_BAD_PORT_ID; 1256 1435 } 1257 1436 1258 while (port ->read_count == 0) {1437 while (portRef->read_count == 0) { 1259 1438 if ((flags & B_RELATIVE_TIMEOUT) != 0 && timeout <= 0) 1260 1439 return B_WOULD_BLOCK; 1261 1440 1262 1441 // We need to wait for a message to appear 1263 1442 ConditionVariableEntry entry; 1264 port ->read_condition.Add(&entry);1443 portRef->read_condition.Add(&entry); 1265 1444 1266 1445 locker.Unlock(); 1267 1446 … … read_port_etc(port_id id, int32* _code, void* buffer, size_t bufferSize, 1269 1448 status_t status = entry.Wait(flags, timeout); 1270 1449 1271 1450 // re-lock 1272 Port* newPort= get_locked_port(id);1273 if (newPort == NULL) {1451 BReference<Port> newPortRef = get_locked_port(id); 1452 if (newPortRef == NULL) { 1274 1453 T(Read(id, 0, 0, 0, B_BAD_PORT_ID)); 1275 1454 return B_BAD_PORT_ID; 1276 1455 } 1277 locker.SetTo(newPort ->lock, true);1456 locker.SetTo(newPortRef->lock, true); 1278 1457 1279 if (newPort != port1280 || (is_port_closed(port ) && port->messages.IsEmpty())) {1458 if (newPortRef != portRef 1459 || (is_port_closed(portRef) && portRef->messages.IsEmpty())) { 1281 1460 // the port is no longer there 1282 1461 T(Read(id, 0, 0, 0, B_BAD_PORT_ID)); 1283 1462 return B_BAD_PORT_ID; 1284 1463 } 1285 1464 1286 1465 if (status != B_OK) { 1287 T(Read(port , 0, status));1466 T(Read(portRef, 0, status)); 1288 1467 return status; 1289 1468 } 1290 1469 } 1291 1470 1292 1471 // determine tail & get the length of the message 1293 port_message* message = port ->messages.Head();1472 port_message* message = portRef->messages.Head(); 1294 1473 if (message == NULL) { 1295 panic("port %" B_PRId32 ": no messages found\n", port ->id);1474 panic("port %" B_PRId32 ": no messages found\n", portRef->id); 1296 1475 return B_ERROR; 1297 1476 } 1298 1477 … … read_port_etc(port_id id, int32* _code, void* buffer, size_t bufferSize, 1300 1479 size_t size = copy_port_message(message, _code, buffer, bufferSize, 1301 1480 userCopy); 1302 1481 1303 T(Read(port , message->code, size));1482 T(Read(portRef, message->code, size)); 1304 1483 1305 port ->read_condition.NotifyOne();1484 portRef->read_condition.NotifyOne(); 1306 1485 // we only peeked, but didn't grab the message 1307 1486 return size; 1308 1487 } 1309 1488 1310 port ->messages.RemoveHead();1311 port ->total_count++;1312 port ->write_count++;1313 port ->read_count--;1489 portRef->messages.RemoveHead(); 1490 portRef->total_count++; 1491 portRef->write_count++; 1492 portRef->read_count--; 1314 1493 1315 notify_port_select_events(port , B_EVENT_WRITE);1316 port ->write_condition.NotifyOne();1494 notify_port_select_events(portRef, B_EVENT_WRITE); 1495 portRef->write_condition.NotifyOne(); 1317 1496 // make one spot in queue available again for write 1318 1497 1319 T(Read(id, port ->read_count, port->write_count, message->code,1498 T(Read(id, portRef->read_count, portRef->write_count, message->code, 1320 1499 min_c(bufferSize, message->size))); 1321 1500 1322 1501 locker.Unlock(); … … writev_port_etc(port_id id, int32 msgCode, const iovec* msgVecs, 1374 1553 port_message* message = NULL; 1375 1554 1376 1555 // get the port 1377 Port* port= get_locked_port(id);1378 if (port == NULL) {1556 BReference<Port> portRef = get_locked_port(id); 1557 if (portRef == NULL) { 1379 1558 TRACE(("write_port_etc: invalid port_id %ld\n", id)); 1380 1559 return B_BAD_PORT_ID; 1381 1560 } 1382 MutexLocker locker(port ->lock, true);1561 MutexLocker locker(portRef->lock, true); 1383 1562 1384 if (is_port_closed(port )) {1563 if (is_port_closed(portRef)) { 1385 1564 TRACE(("write_port_etc: port %ld closed\n", id)); 1386 1565 return B_BAD_PORT_ID; 1387 1566 } 1388 1567 1389 if (port ->write_count <= 0) {1568 if (portRef->write_count <= 0) { 1390 1569 if ((flags & B_RELATIVE_TIMEOUT) != 0 && timeout <= 0) 1391 1570 return B_WOULD_BLOCK; 1392 1571 1393 port ->write_count--;1572 portRef->write_count--; 1394 1573 1395 1574 // We need to block in order to wait for a free message slot 1396 1575 ConditionVariableEntry entry; 1397 port ->write_condition.Add(&entry);1576 portRef->write_condition.Add(&entry); 1398 1577 1399 1578 locker.Unlock(); 1400 1579 1401 1580 status = entry.Wait(flags, timeout); 1402 1581 1403 1582 // re-lock 1404 Port* newPort= get_locked_port(id);1405 if (newPort == NULL) {1583 BReference<Port> newPortRef = get_locked_port(id); 1584 if (newPortRef == NULL) { 1406 1585 T(Write(id, 0, 0, 0, 0, B_BAD_PORT_ID)); 1407 1586 return B_BAD_PORT_ID; 1408 1587 } 1409 locker.SetTo(newPort ->lock, true);1588 locker.SetTo(newPortRef->lock, true); 1410 1589 1411 if (newPort != port || is_port_closed(port)) {1590 if (newPortRef != portRef || is_port_closed(portRef)) { 1412 1591 // the port is no longer there 1413 1592 T(Write(id, 0, 0, 0, 0, B_BAD_PORT_ID)); 1414 1593 return B_BAD_PORT_ID; … … writev_port_etc(port_id id, int32 msgCode, const iovec* msgVecs, 1417 1596 if (status != B_OK) 1418 1597 goto error; 1419 1598 } else 1420 port ->write_count--;1599 portRef->write_count--; 1421 1600 1422 1601 status = get_port_message(msgCode, bufferSize, flags, timeout, 1423 &message, *port );1602 &message, *portRef); 1424 1603 if (status != B_OK) { 1425 1604 if (status == B_BAD_PORT_ID) { 1426 1605 // the port had to be unlocked and is now no longer there … … writev_port_etc(port_id id, int32 msgCode, const iovec* msgVecs, 1461 1640 } 1462 1641 } 1463 1642 1464 port ->messages.Add(message);1465 port ->read_count++;1643 portRef->messages.Add(message); 1644 portRef->read_count++; 1466 1645 1467 T(Write(id, port ->read_count, port->write_count, message->code,1646 T(Write(id, portRef->read_count, portRef->write_count, message->code, 1468 1647 message->size, B_OK)); 1469 1648 1470 notify_port_select_events(port , B_EVENT_READ);1471 port ->read_condition.NotifyOne();1649 notify_port_select_events(portRef, B_EVENT_READ); 1650 portRef->read_condition.NotifyOne(); 1472 1651 return B_OK; 1473 1652 1474 1653 error: 1475 1654 // Give up our slot in the queue again, and let someone else 1476 1655 // try and fail 1477 T(Write(id, port ->read_count, port->write_count, 0, 0, status));1478 port ->write_count++;1479 notify_port_select_events(port , B_EVENT_WRITE);1480 port ->write_condition.NotifyOne();1656 T(Write(id, portRef->read_count, portRef->write_count, 0, 0, status)); 1657 portRef->write_count++; 1658 notify_port_select_events(portRef, B_EVENT_WRITE); 1659 portRef->write_condition.NotifyOne(); 1481 1660 1482 1661 return status; 1483 1662 } … … set_port_owner(port_id id, team_id newTeamID) 1498 1677 BReference<Team> teamReference(team, true); 1499 1678 1500 1679 // get the port 1501 MutexLocker portsLocker(sPortsLock); 1502 Port* port = sPorts.Lookup(id); 1503 if (port == NULL) { 1680 BReference<Port> portRef = get_locked_port(id); 1681 if (portRef == NULL) { 1504 1682 TRACE(("set_port_owner: invalid port_id %ld\n", id)); 1505 1683 return B_BAD_PORT_ID; 1506 1684 } 1507 MutexLocker locker(port ->lock);1685 MutexLocker locker(portRef->lock, true); 1508 1686 1509 1687 // transfer ownership to other team 1510 if (team->id != port->owner) { 1511 list_remove_link(&port->team_link); 1512 list_add_item(&team->port_list, &port->team_link); 1513 port->owner = team->id; 1688 if (team->id != portRef->owner) { 1689 uint8 firstLockIndex = portRef->owner % kTeamListLockCount; 1690 uint8 secondLockIndex = team->id % kTeamListLockCount; 1691 1692 // Avoid deadlocks: always lock lower index first 1693 if (secondLockIndex < firstLockIndex) { 1694 uint8 temp = secondLockIndex; 1695 secondLockIndex = firstLockIndex; 1696 firstLockIndex = temp; 1697 } 1698 1699 MutexLocker oldTeamPortsListLocker(sTeamListLock[firstLockIndex]); 1700 MutexLocker newTeamPortsListLocker; 1701 if (firstLockIndex != secondLockIndex) { 1702 newTeamPortsListLocker.SetTo(sTeamListLock[secondLockIndex], 1703 false); 1704 } 1705 1706 // Now that we have locked the team port lists, check the state again 1707 if (portRef->state == Port::kActive) { 1708 list_remove_link(&portRef->team_link); 1709 list_add_item(&team->port_list, portRef.Get()); 1710 portRef->owner = team->id; 1711 } else { 1712 // Port was already deleted. We haven't changed anything yet so 1713 // we can cancel the operation. 1714 return B_BAD_PORT_ID; 1715 } 1514 1716 } 1515 1717 1516 T(OwnerChange(port , team->id, B_OK));1718 T(OwnerChange(portRef, team->id, B_OK)); 1517 1719 return B_OK; 1518 1720 } 1519 1721 -
src/system/kernel/team.cpp
diff --git a/src/system/kernel/team.cpp b/src/system/kernel/team.cpp index c5b0f7e..4faf72e 100644
a b Team::Team(team_id id, bool kernel) 481 481 exit.initialized = false; 482 482 483 483 list_init(&sem_list); 484 list_init (&port_list);484 list_init_etc(&port_list, port_team_link_offset()); 485 485 list_init(&image_list); 486 486 list_init(&watcher_list); 487 487