多线程&多进程的意义

  • 提高应用程序的响应
  • 有效使地使用多处理器
  • 改进程序结构

多线程/多进程编程需要解决的问题:进程/线程同步和互斥

四种同步模式:

  • 互斥锁:仅允许每次一个线程来执行特定的部分代码或者访问特定数据。
  • 读写锁:允许对受保护的共享资源进行并发读取和独占写入。要修改资源,线程必须首先获取互斥写锁。只有释放所有的读锁之后,才允许使用互斥写锁。
  • 条件变量:会一直阻塞线程,直到特定的条件为真。
  • 信号量:通常用来协调对资源的访问。其中信号计数会初始化为可用资源的数目。然后,线程在资源增加时会增加计数,在删除资源时会减小计数,这些操作都以原子方式执行。计数为零时,尝试减小信号的线程会被阻塞,直到计数大于零为止。可用于异步事件通知

进程间通信方式:

  • 内存共享(shm):创建共享内存区,需要访问这块内存区的进程将其映射到自己的虚拟内存空间,就按内存访问的方式使用了。
  • 消息队列(msg):一个存放消息(数据)容器,一个进程往消息队列中写数据,而另一个进程从消息队列中取数据。
  • 信号量(sem):不能传递复杂消息,只能用来同步
  • 管道(pipe):有命名管道和匿名管道之分,匿名管道只能用于父子进程通讯,命名管道可用于非父子进程,命名管道就是FIFO,管道是单向的先进先出的通讯方式。
  • Unix Socket:使用socket的api,实现进程间双向通信,和普通的socket区别在于传输数据不走网络协议栈
  • signal:通过kill(,SIGUSR1)向指定线程发出明确信号

注意:shm、msg、sem是系统级资源,进程退出后,不会自动销毁。可通过ipcs命令查看系统中创建好的对象。

基于POSIX标准的多线程/进程编程接口

线程的创建

int pthread_create(pthread_t *thread, const pthread_attr_t *attr,
                  void *(*start_routime) (void *), void *arg);
  • 参数attr是线程属性

  • start_routine 是新线程的执行函数

  • arg是透传给start_routine的数据

  • 接口执行成功后,通过thread返回线程ID,该线程ID仅仅是库里的标识,在内核无相应的记录,故仅在当前进程有效。也可通过 pthread_self()接口获取当前线程的ID

  • 线程在内核中的唯一标识(相当于PID)为TID。可通过系统调用来获取:syscall(NR_gettid)

  • 线程的状态信息可从 /proc/<PID>/task/<TID> 中获取

线程属性pthread_attr_t对象的操作接口

  • pthread_attr_init:初始化为缺省值
  • pthread_attr_destroy:销毁属性对象
  • pthread_attr_setdetachstate:设置分离状态
  • pthread_attr_setguardsize:设置栈溢出保护区
  • pthread_attr_setscope:设置竞争CPU的范围(默认PROCESS,即合同进程线程竞争CPU)
  • pthread_attr_setschedpolicy:设置调度策略
  • pthread_attr_setinheritsched:设置调度策略的继承方式(默认新线程不继承创建者线程的调度策略)
  • pthread_attr_setschedparam:设置调度参数(优先级)
  • pthread_attr_setstacksize:设置线程栈大小(32位机默认1MB、64位机默认2MB)
  • pthread_attr_setstack:设置栈地址和大小(可以从堆malloc内存作为线程栈)

线程的分离

int pthread_detach(pthread_t threadd);
  • 使线程处于分离状态,让其自身自灭。效果和通过pthread_attr_setdetachstate设置线程属性相同。
  • 线程处于分离状态后,会在退出时自动回收资源,而不需要其他线程join它

线程的取消

int pthread_cancel(pthread_t thread);
  • 发送Cancel信号给线程。(返回成功不意味着线程终止)
  • 默认情况下(没有设置过Cancellation Type),目标线程收到Cancel信号后,需要运行到取消点(特定的一些系统调用)才能退出。
  • 鉴于以上特点,强烈建议不要使用该接口退出线程。而是采用线程自主退出的方式。

等待线程终止

int pthread_join(pthread_t thread, void **retval);
  • 等待线程结束,并回收线程资源
  • 当目标线程处于非分离状态,必须调用此接口来完成接收,不然就会出现内存泄漏。

线程特定数据(TSD)

  • 单线程 C 程序有两类基本数据:局部数据和全局数据。对于多线程 C 程序,添加了第三类数据:线程特定数据。线程特定数据与全局数据非常相似,区别在于前者为线程专有。
  • 是线程本地存储(TLS)技术在Linux上的实现。
  • TSD基于每个线程进程维护,是定义和引用线程专用数据的唯一方法。
  • 每个TSD项对应一个key,而这个key对进程中所有线程来说是全局的。
  • 实际应用:同一个接口在不同线程中使用线程特定的数据,从而和线程关联起来。典型例子:errno

TSD的使用:

#include <pthread.h>

//创建TSD的键
int pthread_key_create(pthread_key_t *key, void (*destructor)(void*));
//删除TSD键
int pthread_key_delete(pthread_key_t key);
//为指定TSD键设置线程特定绑定
int pthread_setspecific(pthreadd_key_t key, const void *value);
void *pthread_getspecific(pthread_key_t key);
//运行初始化函数init_routine, 并确保init_routine在本进程中只执行一次
int pthread_once(pthread_once_t *once_control, 
                 void (*init_routine)(void));

互斥锁:

//初始化由mp指向的互斥锁(可位于共享内存),mattr为互斥锁属性
int pthread_mutex_init(pthread_mutex_t *mp, 
                       const pthread_mutexattr_t *mattr);
//锁定互斥锁。返回EOWNERDEAD表示所有者挂了。
int pthread_mutex_lock(pthread_mutex_t *mutex);
//解除锁定互斥锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
//恢复锁的一致性。当锁的所有者死掉后,需先调此接口恢复锁的一致性,再去锁定
int pthread_mutex_consistent(pthread_mutex_t *mutex);
//销毁互斥锁
int pthread_mutex_destroy(pthread_mutex_t *mp);
  • 如果要将mutex应用于进程间同步,需将mp指向共享内存,并设置PTHREAD_PROCESS_SHARED、PTHREAD_MUTEX_ROBUST属性。另外注意要在异常时及时通过调用pthread_mutex_consistent修复锁。
  • 可通过设置互斥锁属性PTHREAD_MUTEX_RECURSIVE,将互斥锁配置成递归锁。即一个线程在锁定该互斥锁后,可以在不释放的情况下再次锁定。

读写锁:

//初始化由rwlock指定的读写锁(可位于共享内存)
int pthread_rwlock_init(pthread_rwlock_t *rwlock, 
                        const pthread_rwlockattr_t *attr);
//获取读锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
//获取写锁
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock );
//解除锁定读写锁
int pthread_rwlock_unlock (pthread_rwlock_t *rwlock);
//销毁读写锁
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
  • 读写锁是用来解决读者和写者同步的问题,读可以有多个同时读,写只允许一个写,另外写的时候不允许读,读的时候不允许写。
  • 设置读写锁属性PTHREAD_PROCESS_SHARED,可允许其他进程共享读写锁

条件变量:

//初始化由cv指向的条件变量(可位于共享内存),cattr为条件变量属性
int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr);
//基于条件变量阻塞。
//以原子方式释放 mp 所指向的互斥锁,并导致调用线程基于 cv 所指向的条件变量阻塞。
//当阻塞被解除后,又原子方式再次获取互斥锁——即使是返回错误
int pthread_cond_wait(pthread_cond_t *cv, pthread_mutex_t *mp);
//在指定时间前阻塞。返回ETIMEDOUT表示等待超时。
int pthread_cond_timedwait(pthread_cond_t *cv, pthread_mutex_t *mp, 
                           const struct timespec *abstime);
//解除一个线程的阻塞。由调度策略决定唤醒哪个线程。
int pthread_cond_signal(pthread_cond_t *cv);
//解除所有线程的阻塞。将导致线程竞争互斥锁
int pthread_cond_broadcast(pthread_cond_t *cv);
//销毁条件变量状态
int pthread_cond_destroy(pthread_cond_t *cv);
  • 通常,对条件的评估是在互斥锁的保护下进行的。如果条件为假,线程会基于条件变量阻塞。当另一个线程更改条件值时,会针对条件变量发出信号。这种变化会导致所有等待该条件的线程解除阻塞并尝试再次获取互斥锁。

  • 为了支持多进程间共享条件变量。需将cv指向共享内存,同时设置条件变量的属性为PTHREAD_PROCESS_SHARED

  • pthread_cond_timewait默认使用的时钟类型是CLOCK_REALTIME,假如系统实时时钟被修改,将导致timewait出现错误。可将条件变量的属性中的时钟类型该为CLOCK_MONOTONIC(单调时钟)。同时在获取当前tv时使用

    struct timespec tv;
    clock_gettime(CLOCK_MONOTONIC, &tv);
    

信号量:

//初始sem指定的信号量(可位于共享内存),pshare非0时表示允许其他进程共享,value是信号量计数初始值
int sem_init(sem_t *sem, int pshared, unsigned int value);
//以原子方式增加 sem 所指示的信号量
int sem_post(sem_t *sem);
//基于信号量阻塞,直到 sem 所指示的信号计数大于零为止,之后以原子方式减小计数
int sem_wait(sem_t *sem);
//销毁信号量
int sem_destroy(sem_t *sem);

共享内存(shm)

//创建共享内存。key一般由ftok生成,size为共享内存大小
int shmget(key_t key, size_t size, int shmflg);
//挂接共享内存。若成功则返回映射后的虚拟地址
void *shmat(int shmid, const void *shmaddr, int shmflg);
//解挂共享内存。使共享内存的引用计数减1
int shmdt(const void *shmaddr);
//销毁共享内存。cmd置为IPC_RMID
int shmctl(int shmid, int cmd, struct shmid_ds *buf);
  • 共享内存是两个正在运行的进程之间共享和传递数据的一种非常有效的方式。是最高效的IPC方式
  • 共享内存没有提供同步机制,这使得我们在使用共享内存进行进程间通信时,往往需要借助其他手段来保证进程之间的同步。如:信号量
  • 可通过ipcs命令查看和删除共享内存

消息队列(msg)

//创建消息队列。key要求整个系统唯一,一般由ftok生成
int msgget(key_t key, int msgflg);
//往消息队列发送消息
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
//从消息队列读取消息
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
                      int msgflg);
//删除消息队列。cmd置为IPC_RMID
int msgctl(int msqid, int cmd, struct msqid_ds *buf);
  • 可通过ipcs命令查看和删除消息队列

信号量(sem)

//创建信号量。key一般由ftok生成,nsems为信号计数初始值
int semget(key_t key, int nsems, int semflg);
//操作信号量。sops->sem_op为-1时表示P操作,+1时表示V操作
int semop(int semid, struct sembuf *sops, unsigned nsops);
//销毁信号量。cmd置为IPC_RMID
int semctl(int semid, int semnum, int cmd, ...);
  • 用来解决进程间的同步与互斥问题的一种进程间通信机制,包括一个称为信号量的变量和在该信号量下等待资源的进程等待队列,以及对信号量进行的两个原子操作(P/V操作)
  • 和shm和msg一样,sem不会随进程退出而销毁,可通过ipcs命令查看和删除信号量

管道(pipe)

  • 管道是一种最基本的IPC机制
  • 规定数据从管道的写端流入管道,从读端流出。
  • 原理: 管道实为内核使用环形队列机制,借助内核缓冲区(4k)实现。其本质是一个伪文件
  • 数据不可反复读取。一旦被读走,便不在管道中存在
  • 数据只能在一个方向上流动。
  • 管道分为匿名管道和命名管道
//匿名管道:通过系统调用pipe()来实现,只能在有亲缘关系的进程间使用
//创建管道成功后,得到两个文件描述符fd[0]、fd[1],分别指向管道的读端和写端
//接下来可以通过read/write标准的文件读写接口操作管道,实现数据传输
int pipe(int fd[2]);
//命名管道:和匿名管道类似,但不局限于亲緣关系的进程。它基于管道文件,因此我们首先要创建一个管道文件:
int mkfifo(const char *name,mode_t mode);
//调用以上系统调用的效果,和通过mkfifo命令创建的一样。

//创建管道文件后,需要调用open打开管道文件
int fd=open(name,O_RDONLY);   //读
int fd=open(name,O_WRONLY);  //写

//接下来就可以和匿名管道一样通过read/write进行读写了

管道的读写行为:

  • 如果所有指向管道读端的文件描述符都关闭了(管道读端引用计数为0),这时有进程向管道的写端write,那么该进程会收到信号SIGPIPE,通常会导致进程异常终止。当然也可以对SIGPIPE信号实施捕捉,不终止进程。
  • 如果所有指向管道写端的文件描述符都关闭了(管道写端引用计数为0),而仍然有进程从管道的读端读数据,那么管道中剩余的数据都被读取后,再次read会返回0,就像读到文件末尾一样。
  • 如果有指向管道写端的文件描述符没关闭(管道写端引用计数大于0),而持有管道写端的进程也没有向管道中写数据,这时有进程从管道读端读数据,那么管道中剩余的数据都被读取后,再次read会阻塞,直到管道中有数据可读了才读取数据并返回。

Unix Socket

  • UNIX Domain SOCKET 是在Socket架构上发展起来的用于同一台主机的进程间通信。它不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序列号应答等。只是将应用层数据从一个进程拷贝到另一个进程。
  • UNIX Domain SOCKET有SOKCET_DGRAM和SOCKET_STREAM两种模式,类似于UDP和TCP,但是面向消息的UNIX socket也是可靠的,消息既不会丢失也不会乱序。
  • 和pipe不一样,UNIX socket通讯是全双工的
  • 使用UNIX Socket的过程和网络socket十分相似。也要先调用socket()创建一个socket文件描述符,domain指定为AF_UNIX
  • 与网络socket编程最明显的不同在于地址格式不同,用结构体sockaddr_un表示,网络编程的socket地址是IP地址加端口号,而UNIX Domain Socket的地址是一个socket类型的文件在文件系统中的路径,这个socket文件由bind()调用创建,如果调用bind()时该文件已存在,则bind()错误返回。

signal

  • 信号机制是进程之间相互传递消息的一种方法,信号全称为软中断信号。从它的命名可以看出,它的实质是软中断
  • 许多重要的程序都需要处理信号。比如,终端用户输入了 ctrl+c 来中断程序,会通过信号机制停止一个程序。
  • 每个信号都有一个名字和编号,这些名字都以“SIG”开头,例如“SIGIO ”、“SIGCHLD”等等。信号定义在signal.h头文件中,信号名都定义为正整数。信号从1开始编号。
  • 信号的处理方式有三种:
    • 忽略:SIGKILL和SIGSTOP不能忽略
    • 捕捉:由用户自定义信号处理函数,SIGKILL和SIGSTOP不能被捕捉
    • 默认动作:对于每种信号,系统都有默认的处理方式,可通过”man 7 signal”查看
  • 可通过kill命令向进程发送信号,如发送SIGKILL(编号9)信号给PID为1555的进程:kill -9 1555

signal()函数不能携带数据,如果要携带数据,则需要使用高级版的接口:

int sigaction(int signum, const struct sigaction *act, 
              struct sigaction *oldact);   // 注册

int sigqueue(pid_t pid, int sig, const union sigval value);   // 发送