并发编程

POSIX封装

线程

import Darwin

class Thread {
    let block: () -> Void
    var thread: pthread_t!
    
    init(block: @escaping @Sendable () -> Void) {
        self.block = block
    }
    
    func start() {
        let selfPtr = Unmanaged.passRetained(self).toOpaque()
        pthread_create(&thread, nil, { context in
            let selfPtr = unsafeBitCast(context, to: Thread.self)
            selfPtr.block()
            Unmanaged<Thread>.fromOpaque(context).release()
            return nil
        }, selfPtr);
    }
    
    func join() {
        pthread_join(thread, nil)
    }
}

import Darwin

class Lock: @unchecked Sendable {
    var mutex = pthread_mutex_t()
    
    init() {
        pthread_mutex_init(&mutex, nil)
    }
    
    deinit {
        pthread_mutex_destroy(&mutex)
    }
    
    func lock() {
        pthread_mutex_lock(&mutex)
    }
    
    func unlock() {
        pthread_mutex_unlock(&mutex)
    }
}

线程安全

读写冲突

场景模拟

读一半写

class A: @unchecked Sendable {
    var a = 0
}

// 执行一百次以观察到结果为-1的情况
for _ in 0..<100 {
    let a = A()
    @Sendable
    func setA(_ v: Int) {
        a.a = v
    }
    @Sendable
    func getA() -> Int {
        a.a += 1
        // 读一半,主动触发调度,另一个线程把a写为0,得到非预期的结果
        pthread_yield_np()
        a.a -= 1
        return a.a
    }
    
    let thread1 = Thread {
        print(getA(), terminator: " ")
    }
    thread1.start()
    
    let thread2 = Thread {
        setA(0)
    }
    thread2.start()
    
    thread1.join()
    thread2.join()
}

解决方法

加锁保证读写原子性,读写隔离
(OC可以使用atomic属性修饰)

class A: @unchecked Sendable {
    var a = 0
}

for _ in 0..<100 {
    let a = A()
    let lock = Lock()
    @Sendable
    func setA(_ v: Int) {
        lock.lock()
        a.a = v
        lock.unlock()
    }
    @Sendable
    func getA() -> Int {
        lock.lock()
        a.a += 1
        pthread_yield_np()
        a.a -= 1
        lock.unlock()
        return a.a
    }
    
    let thread1 = Thread {
        print(getA(), terminator: " ")
    }
    thread1.start()
    
    let thread2 = Thread {
        setA(0)
    }
    thread2.start()
    
    thread1.join()
    thread2.join()
}

数据竞争

场景模拟

多个读写线程,读了还没写,其它线程又读到没更新的旧值

class A: @unchecked Sendable {
    var a = 0
}

// 执行一百次以观察到结果不为10的情况
for _ in 0..<100 {
    let a = A()
    
    var threads = [Thread]()
    
    for _ in 0..<10 {
        let thread = Thread {
            let t = a.a
            // 读了还没写,主动触发调度,使其它线程读到没更新的旧值
            pthread_yield_np()
            a.a = t + 1
        }
        threads.append(thread)
        thread.start()
    }
    
    for thread in threads {
        thread.join()
    }
    
    print(a.a, terminator: " ")
}

解决方法

加锁

class A: @unchecked Sendable {
    var a = 0
}

for _ in 0..<100 {
    let a = A()
    let lock = Lock()
    
    var threads = [Thread]()
    
    for _ in 0..<10 {
        let thread = Thread {
            lock.lock()
            let t = a.a
            pthread_yield_np()
            a.a = t + 1
            lock.unlock()
        }
        threads.append(thread)
        thread.start()
    }
    
    for thread in threads {
        thread.join()
    }
    
    print(a.a, terminator: " ")
}

死锁

场景模拟

线程1持有锁1,线程2持有锁2,线程1等待锁2,线程2等待锁1,互相等待

// 执行一百次以观察到死锁的情况
for _ in 0..<100 {
    let lock1 = Lock()
    let lock2 = Lock()

    let thread1 = Thread {
        lock1.lock()
        pthread_yield_np()
        lock2.lock()
        lock2.unlock()
        lock1.unlock()
    }
    thread1.start()

    let thread2 = Thread {
        lock2.lock()
        pthread_yield_np()
        lock1.lock()
        lock1.unlock()
        lock2.unlock()
    }
    thread2.start()

    thread1.join()
    thread2.join()

    print("no dead lock")
}

解决方法

约定锁的顺序

for _ in 0..<100 {
    let lock1 = Lock()
    let lock2 = Lock()

    let thread1 = Thread {
        lock1.lock()
        pthread_yield_np()
        lock2.lock()
        lock2.unlock()
        lock1.unlock()
    }
    thread1.start()

    let thread2 = Thread {
        lock1.lock()
        pthread_yield_np()
        lock2.lock()
        lock2.unlock()
        lock1.unlock()
    }
    thread2.start()

    thread1.join()
    thread2.join()

    print("no dead lock")
}

GCD

原理模拟

任务队列,线程池循环从队列里取任务执行
并行队列不加锁,线程池里的每个线程,都可以从队列里取任务执行,不能保证执行顺序
串行队列加锁,保证队列的任务执行在同一个线程上,可以保证队列内部的执行顺序

typealias Task = () -> Void

class DispatchQueue: @unchecked Sendable {
    var queue: [Task] = []
    let lock = Lock()
    
    static let main = DispatchQueue()
    static let _global = DispatchQueue()
    class func global() -> DispatchQueue {
        _global
    }
    
    init(label: String = "") {
        serialQueue.queue.append(self)
    }
    
    func async(block: @escaping Task) {
        lock.lock()
        queue.append(block)
        lock.unlock()
    }
    
    func sync(block: Task) {
        block()
    }
    
    func dequeue() -> Task? {
        lock.lock()
        defer {
            lock.unlock()
        }
        return queue.isEmpty ? nil : queue.removeFirst()
    }
}

class SerialQueue: @unchecked Sendable {
    var queue = [DispatchQueue]()
}
let serialQueue = SerialQueue()
let serialLock = Lock()

func dispatchMain() {
    let threadPool = (0..<10).map { _ in
        Thread {
            while let task = DispatchQueue.global().dequeue() {
                task()
            }
            for queue in serialQueue.queue {
                serialLock.lock()
                while let task = queue.dequeue() {
                    task()
                }
                serialLock.unlock()
            }
        }
    }
    threadPool.forEach { thread in
        thread.start()
    }
    threadPool.forEach { thread in
        thread.join()
    }
    while let task = DispatchQueue.main.dequeue() {
        task()
    }
}

原子操作flag,保证多线程条件下只执行一次
(单线程,只用flag就可以保证只执行一次)

import Synchronization

func dispatchOnce(_ done: borrowing Atomic<Bool>, block: @escaping () -> Void) {
    if !done.compareExchange(expected: false, desired: true, ordering: .relaxed).original {
        block()
    }
}

常见用法

命令行程序要手动调用dispatchMain,UIApplication,NSApplication,RunLoop会自动调

dispatchMain()

主线程执行任务
(更新UI必须在主线程)

DispatchQueue.main.async {
    print("main thread")
}

非主线程串行执行任务

let queue = DispatchQueue(label: "")
queue.async {
    for i in 0..<3 {
        print("a \(i)")
    }
}
queue.async {
    for i in 0..<3 {
        print("b \(i)")
    }
}
queue.async {
    for i in 0..<3 {
        print("c \(i)")
    }
}

并行执行任务

DispatchQueue.global().async {
    for i in 0..<3 {
        print("a \(i)")
    }
}
DispatchQueue.global().async {
    for i in 0..<3 {
        print("b \(i)")
    }
}
DispatchQueue.global().async {
    for i in 0..<3 {
        print("c \(i)")
    }
}

延时执行

DispatchQueue.global().asyncAfter(deadline: .now() + .seconds(3)) {
    print("execute after 3 seconds")
}

群组

let group = DispatchGroup()
DispatchQueue.global().async(group: group) {
    print(1)
}
DispatchQueue.global().async(group: group) {
    print(2)
}
DispatchQueue.global().async(group: group) {
    print(3)
}
group.notify(queue: .main) {
    print("all done")
}

信号量为1时,相当于一个互斥锁

for _ in 0..<100 {
    var a = 0
    let group = DispatchGroup()
    let semaphore = DispatchSemaphore(value: 1)
    
    for _ in 0..<10 {
        DispatchQueue.global().async(group: group) {
            semaphore.wait()
            let t = a
            pthread_yield_np()
            a = t + 1
            semaphore.signal()
        }
    }
    
    group.notify(queue: .main) {
        print(a, terminator: " ")
    }
}

OperationQueue

并行执行任务

let queue = OperationQueue()

let operation1 = BlockOperation {
    for i in 0..<3 {
        print("a \(i)")
    }
}

let operation2 = BlockOperation {
    for i in 0..<3 {
        print("b \(i)")
    }
}

let operation3 = BlockOperation {
    for i in 0..<3 {
        print("c \(i)")
    }
}

queue.addOperations([operation1, operation2, operation3], waitUntilFinished: false)

任务完成回调

operation2.completionBlock = {
    print("b all done")
}

任务依赖,确保任务A在任务B执行完后再执行

operation1.addDependency(operation2)

Concurrency

func hello(_ name: String) async -> Int {
    for i in 0..<3 {
        print("\(name) \(i)")
    }
    return 1
}

串行

let a = await hello("a")
let b = await hello("b")
let c = await hello("c")
print(a + b + c)

并行

async let a = hello("a")
async let b = hello("b")
async let c = hello("c")
print(await a + b + c)

群组

await withTaskGroup(of: Int.self) { group in
    for name in ["a", "b", "c"] {
        group.addTask {
            await hello(name)
        }
    }
    
    var sum = 0
    for await r in group {
        sum += r
    }
    print(sum)
}

结构化
B和A是串行,BA和C是并行,确保A在B执行完后再执行

async let ba = {
    let b = await hello("b")
    let a = await hello("a")
    return a + b
}()
async let c = hello("c")
print(await ba + c)

actor执行者

数据竞争场景模拟

class A: @unchecked Sendable {
    var a = 0
}

let a = A()

await withTaskGroup(of: Void.self) { group in
    for _ in 0..<10 {
        group.addTask {
            let t = a.a
            sleep(1)
            a.a = t + 1
        }
    }
}

print(a.a)

数据竞争解决方法

actor内部线程隔离,保证线程安全

actor A {
    var a = 0
    
    func increase() {
        let t = a
        sleep(1)
        a = t + 1
    }
}

let a = A()

await withTaskGroup(of: Void.self) { group in
    for _ in 0..<10 {
        group.addTask {
            await a.increase()
        }
    }
}

print(await a.a)

只有os_unfair_lock是自旋锁

自旋循环忙等,阻塞调度休眠
短时间用自旋,长时间用阻塞

小于一个时间片的等待,不需要触发调度,就让操作系统等一小会儿,使用自旋锁; 大于一个时间片的等待,不如让出时间片休眠,使用阻塞锁

NSLock

let lock = NSLock()

lock.lock()
// 临界区
lock.unlock()

NSRecursiveLock

let lock = NSRecursiveLock()

lock.lock()
// 临界区
lock.unlock()

NSConditionLock

let lock = NSConditionLock(condition: 2)

lock.lock(whenCondition: 2)
// 临界区
lock.unlock(withCondition: 3)
            
lock.lock(whenCondition: 3)
// 临界区
lock.unlock(withCondition: 2)

DispatchSemaphore

let semaphore = DispatchSemaphore(value: 1)

semaphore.wait()
// 临界区
semaphore.signal()

UnfairLock

class UnfairLock: @unchecked Sendable {
    var unfairLock = os_unfair_lock()
    
    func lock() {
        os_unfair_lock_lock(&unfairLock)
    }
    
    func unlock() {
        os_unfair_lock_unlock(&unfairLock)
    }
}

let lock = UnfairLock()

lock.lock()
// 临界区
lock.unlock()

Synchronization Mutex

let a = Mutex(0)

a.withLock { a in
    // 临界区
}