并发编程
POSIX封装
线程
import Darwin
class Thread {
let block: () -> Void
var thread: pthread_t!
init(block: @escaping @Sendable () -> Void) {
self.block = block
}
func start() {
let selfPtr = Unmanaged.passRetained(self).toOpaque()
pthread_create(&thread, nil, { context in
let selfPtr = unsafeBitCast(context, to: Thread.self)
selfPtr.block()
Unmanaged<Thread>.fromOpaque(context).release()
return nil
}, selfPtr);
}
func join() {
pthread_join(thread, nil)
}
}
锁
import Darwin
class Lock: @unchecked Sendable {
var mutex = pthread_mutex_t()
init() {
pthread_mutex_init(&mutex, nil)
}
deinit {
pthread_mutex_destroy(&mutex)
}
func lock() {
pthread_mutex_lock(&mutex)
}
func unlock() {
pthread_mutex_unlock(&mutex)
}
}
线程安全
读写冲突
场景模拟
读一半写
class A: @unchecked Sendable {
var a = 0
}
// 执行一百次以观察到结果为-1的情况
for _ in 0..<100 {
let a = A()
@Sendable
func setA(_ v: Int) {
a.a = v
}
@Sendable
func getA() -> Int {
a.a += 1
// 读一半,主动触发调度,另一个线程把a写为0,得到非预期的结果
pthread_yield_np()
a.a -= 1
return a.a
}
let thread1 = Thread {
print(getA(), terminator: " ")
}
thread1.start()
let thread2 = Thread {
setA(0)
}
thread2.start()
thread1.join()
thread2.join()
}
解决方法
加锁保证读写原子性,读写隔离
(OC可以使用atomic属性修饰)
class A: @unchecked Sendable {
var a = 0
}
for _ in 0..<100 {
let a = A()
let lock = Lock()
@Sendable
func setA(_ v: Int) {
lock.lock()
a.a = v
lock.unlock()
}
@Sendable
func getA() -> Int {
lock.lock()
a.a += 1
pthread_yield_np()
a.a -= 1
lock.unlock()
return a.a
}
let thread1 = Thread {
print(getA(), terminator: " ")
}
thread1.start()
let thread2 = Thread {
setA(0)
}
thread2.start()
thread1.join()
thread2.join()
}
数据竞争
场景模拟
多个读写线程,读了还没写,其它线程又读到没更新的旧值
class A: @unchecked Sendable {
var a = 0
}
// 执行一百次以观察到结果不为10的情况
for _ in 0..<100 {
let a = A()
var threads = [Thread]()
for _ in 0..<10 {
let thread = Thread {
let t = a.a
// 读了还没写,主动触发调度,使其它线程读到没更新的旧值
pthread_yield_np()
a.a = t + 1
}
threads.append(thread)
thread.start()
}
for thread in threads {
thread.join()
}
print(a.a, terminator: " ")
}
解决方法
加锁
class A: @unchecked Sendable {
var a = 0
}
for _ in 0..<100 {
let a = A()
let lock = Lock()
var threads = [Thread]()
for _ in 0..<10 {
let thread = Thread {
lock.lock()
let t = a.a
pthread_yield_np()
a.a = t + 1
lock.unlock()
}
threads.append(thread)
thread.start()
}
for thread in threads {
thread.join()
}
print(a.a, terminator: " ")
}
死锁
场景模拟
线程1持有锁1,线程2持有锁2,线程1等待锁2,线程2等待锁1,互相等待
// 执行一百次以观察到死锁的情况
for _ in 0..<100 {
let lock1 = Lock()
let lock2 = Lock()
let thread1 = Thread {
lock1.lock()
pthread_yield_np()
lock2.lock()
lock2.unlock()
lock1.unlock()
}
thread1.start()
let thread2 = Thread {
lock2.lock()
pthread_yield_np()
lock1.lock()
lock1.unlock()
lock2.unlock()
}
thread2.start()
thread1.join()
thread2.join()
print("no dead lock")
}
解决方法
约定锁的顺序
for _ in 0..<100 {
let lock1 = Lock()
let lock2 = Lock()
let thread1 = Thread {
lock1.lock()
pthread_yield_np()
lock2.lock()
lock2.unlock()
lock1.unlock()
}
thread1.start()
let thread2 = Thread {
lock1.lock()
pthread_yield_np()
lock2.lock()
lock2.unlock()
lock1.unlock()
}
thread2.start()
thread1.join()
thread2.join()
print("no dead lock")
}
GCD
原理模拟
任务队列,线程池循环从队列里取任务执行
并行队列不加锁,线程池里的每个线程,都可以从队列里取任务执行,不能保证执行顺序
串行队列加锁,保证队列的任务执行在同一个线程上,可以保证队列内部的执行顺序
typealias Task = () -> Void
class DispatchQueue: @unchecked Sendable {
var queue: [Task] = []
let lock = Lock()
static let main = DispatchQueue()
static let _global = DispatchQueue()
class func global() -> DispatchQueue {
_global
}
init(label: String = "") {
serialQueue.queue.append(self)
}
func async(block: @escaping Task) {
lock.lock()
queue.append(block)
lock.unlock()
}
func sync(block: Task) {
block()
}
func dequeue() -> Task? {
lock.lock()
defer {
lock.unlock()
}
return queue.isEmpty ? nil : queue.removeFirst()
}
}
class SerialQueue: @unchecked Sendable {
var queue = [DispatchQueue]()
}
let serialQueue = SerialQueue()
let serialLock = Lock()
func dispatchMain() {
let threadPool = (0..<10).map { _ in
Thread {
while let task = DispatchQueue.global().dequeue() {
task()
}
for queue in serialQueue.queue {
serialLock.lock()
while let task = queue.dequeue() {
task()
}
serialLock.unlock()
}
}
}
threadPool.forEach { thread in
thread.start()
}
threadPool.forEach { thread in
thread.join()
}
while let task = DispatchQueue.main.dequeue() {
task()
}
}
原子操作flag,保证多线程条件下只执行一次
(单线程,只用flag就可以保证只执行一次)
import Synchronization
func dispatchOnce(_ done: borrowing Atomic<Bool>, block: @escaping () -> Void) {
if !done.compareExchange(expected: false, desired: true, ordering: .relaxed).original {
block()
}
}
常见用法
命令行程序要手动调用dispatchMain
,UIApplication,NSApplication,RunLoop会自动调
dispatchMain()
主线程执行任务
(更新UI必须在主线程)
DispatchQueue.main.async {
print("main thread")
}
非主线程串行执行任务
let queue = DispatchQueue(label: "")
queue.async {
for i in 0..<3 {
print("a \(i)")
}
}
queue.async {
for i in 0..<3 {
print("b \(i)")
}
}
queue.async {
for i in 0..<3 {
print("c \(i)")
}
}
并行执行任务
DispatchQueue.global().async {
for i in 0..<3 {
print("a \(i)")
}
}
DispatchQueue.global().async {
for i in 0..<3 {
print("b \(i)")
}
}
DispatchQueue.global().async {
for i in 0..<3 {
print("c \(i)")
}
}
延时执行
DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(3)) {
print("execute after 3 seconds")
}
群组
let group = DispatchGroup()
DispatchQueue.global().async(group: group) {
print(1)
}
DispatchQueue.global().async(group: group) {
print(2)
}
DispatchQueue.global().async(group: group) {
print(3)
}
group.notify(queue: .main) {
print("all done")
}
信号量为1时,相当于一个互斥锁
for _ in 0..<100 {
var a = 0
let group = DispatchGroup()
let semaphore = DispatchSemaphore(value: 1)
for _ in 0..<10 {
DispatchQueue.global().async(group: group) {
semaphore.wait()
let t = a
pthread_yield_np()
a = t + 1
semaphore.signal()
}
}
group.notify(queue: .main) {
print(a, terminator: " ")
}
}
OperationQueue
并行执行任务
let queue = OperationQueue()
let operation1 = BlockOperation {
for i in 0..<3 {
print("a \(i)")
}
}
let operation2 = BlockOperation {
for i in 0..<3 {
print("b \(i)")
}
}
let operation3 = BlockOperation {
for i in 0..<3 {
print("c \(i)")
}
}
queue.addOperations([operation1, operation2, operation3], waitUntilFinished: false)
任务完成回调
operation2.completionBlock = {
print("b all done")
}
任务依赖,确保任务A在任务B执行完后再执行
operation1.addDependency(operation2)
Concurrency
func hello(_ name: String) async -> Int {
for i in 0..<3 {
print("\(name) \(i)")
}
return 1
}
串行
let a = await hello("a")
let b = await hello("b")
let c = await hello("c")
print(a + b + c)
并行
async let a = hello("a")
async let b = hello("b")
async let c = hello("c")
print(await a + b + c)
群组
await withTaskGroup(of: Int.self) { group in
for name in ["a", "b", "c"] {
group.addTask {
await hello(name)
}
}
var sum = 0
for await r in group {
sum += r
}
print(sum)
}
结构化
B和A是串行,BA和C是并行,确保A在B执行完后再执行
async let ba = {
let b = await hello("b")
let a = await hello("a")
return a + b
}()
async let c = hello("c")
print(await ba + c)
actor执行者
数据竞争场景模拟
class A: @unchecked Sendable {
var a = 0
}
let a = A()
await withTaskGroup(of: Void.self) { group in
for _ in 0..<10 {
group.addTask {
let t = a.a
sleep(1)
a.a = t + 1
}
}
}
print(a.a)
数据竞争解决方法
actor内部线程隔离,保证线程安全
actor A {
var a = 0
func increase() {
let t = a
sleep(1)
a = t + 1
}
}
let a = A()
await withTaskGroup(of: Void.self) { group in
for _ in 0..<10 {
group.addTask {
await a.increase()
}
}
}
print(await a.a)
锁
只有os_unfair_lock是自旋锁
自旋循环忙等,阻塞调度休眠
短时间用自旋,长时间用阻塞
小于一个时间片的等待,不需要触发调度,就让操作系统等一小会儿,使用自旋锁; 大于一个时间片的等待,不如让出时间片休眠,使用阻塞锁
NSLock
let lock = NSLock()
lock.lock()
// 临界区
lock.unlock()
NSRecursiveLock
let lock = NSRecursiveLock()
lock.lock()
// 临界区
lock.unlock()
NSConditionLock
let lock = NSConditionLock(condition: 2)
lock.lock(whenCondition: 2)
// 临界区
lock.unlock(withCondition: 3)
lock.lock(whenCondition: 3)
// 临界区
lock.unlock(withCondition: 2)
DispatchSemaphore
let semaphore = DispatchSemaphore(value: 1)
semaphore.wait()
// 临界区
semaphore.signal()
UnfairLock
class UnfairLock: @unchecked Sendable {
var unfairLock = os_unfair_lock()
func lock() {
os_unfair_lock_lock(&unfairLock)
}
func unlock() {
os_unfair_lock_unlock(&unfairLock)
}
}
let lock = UnfairLock()
lock.lock()
// 临界区
lock.unlock()
Synchronization Mutex
let a = Mutex(0)
a.withLock { a in
// 临界区
}