并发编程

线程

创建线程和终止线程

import Darwin.POSIX.pthread.pthread

for i in 0..<3 {
    let pi = UnsafeMutablePointer<Int>.allocate(capacity: 1)
    pi.pointee = i
    
    var tid: pthread_t?
    pthread_create(&tid, nil, { p in
        p.withMemoryRebound(to: Int.self, capacity: 1) { pi in
            let i = pi.pointee
            
            p.deallocate()
            
            if i == 0 {
                pthread_exit(nil)
            }
            
            print("thread \(pthread_self()): \(i)")
        }
        
        return nil
    }, pi)
}

等待回收线程

import Darwin.POSIX.pthread.pthread

for i in 0..<3 {
    let pi = UnsafeMutablePointer<Int>.allocate(capacity: 1)
    pi.pointee = i
    
    var tid: pthread_t?
    pthread_create(&tid, nil, { p in
        p.withMemoryRebound(to: Int.self, capacity: 1) { pi in
            let i = pi.pointee
            
            p.deallocate()
            
            if i == 0 {
                pthread_exit(nil)
            }
            
            print("thread \(pthread_self()): \(i)")
        }
        
        return nil
    }, pi)
    
    pthread_join(tid!, nil)
}

分离线程

// 不需要主线程等待回收线程的分离线程,以便分离的线程在终止后自动回收
pthread_detach(pthread_self())

基于线程的并发服务端

import Darwin.POSIX.sys.socket
import Darwin.POSIX.netdb
import Darwin.POSIX.pthread.pthread

var hints = addrinfo()
hints.ai_family = AF_INET
hints.ai_socktype = SOCK_STREAM
hints.ai_protocol = IPPROTO_IP

var info: UnsafeMutablePointer<addrinfo>?
defer {
    if info != nil {
        freeaddrinfo(info)
    }
}

getaddrinfo(nil, "3000", &hints, &info)

var fd: Int32 = -1
while info != nil {
    fd = socket(info!.pointee.ai_family, info!.pointee.ai_socktype, info!.pointee.ai_protocol)
    
    if bind(fd, info!.pointee.ai_addr, info!.pointee.ai_addrlen) == 0 {
        break
    }
    
    info = info?.pointee.ai_next
}

listen(fd, 1000)

while true {
    var addr = sockaddr_in()
    var addrlen = socklen_t()
    
    let cfd = withUnsafeMutablePointer(to: &addr) {
        $0.withMemoryRebound(to: sockaddr.self, capacity: 1) {
            accept(fd, $0, &addrlen)
        }
    }
    
    let cfdp = UnsafeMutablePointer<Int32>.allocate(capacity: 1)
    cfdp.pointee = cfd
    
    var tid: pthread_t?
    pthread_create(&tid, nil, { p in
        p.withMemoryRebound(to: Int32.self, capacity: 1) { cfdp in
            let cfd = cfdp.pointee
            
            pthread_detach(pthread_self())
            
            p.deallocate()
            
            let buffer = UnsafeMutableBufferPointer<CChar>.allocate(capacity: 30)
            defer {
                buffer.deallocate()
            }
            
            while recv(cfd, buffer.baseAddress, buffer.count, 0) != 0 {
                
                send(cfd, buffer.baseAddress, buffer.count, 0)
            }
            
            close(cfd)
        }
        
        return nil
    }, cfdp)
}

信号量

使用互斥信号量同步线程的共享变量

import Darwin.POSIX.pthread.pthread

var counter = 0

var mutex = sem_open("mutex", O_CREAT, S_IRWXU, 1)

var tid1: pthread_t?
var tid2: pthread_t?

pthread_create(&tid1, nil, { _ in
    for _ in 0..<10_0000 {
        sem_wait(mutex)
        counter += 1
        sem_post(mutex)
    }
    
    return nil
}, nil)

pthread_create(&tid2, nil, { _ in
    for _ in 0..<10_0000 {
        sem_wait(mutex)
        counter += 1
        sem_post(mutex)
    }
    
    return nil
}, nil)

pthread_join(tid1!, nil)
pthread_join(tid2!, nil)

sem_close(mutex)
sem_unlink("mutex")

print(counter)