#include <sys/sem.h>
int semctl(int sem_id, int sem_num, int command, ...);
int semget(key_t key, int num_sems, int sem_flags);
int semop(int sem_id, struct sembuf *sem_ops, size_tnum_sem_ops);
semget 函数的作用是创建一个信号量或是取得一个已有信号量的建.
key
: 是一个整数值,部相关的进程可以通过他访问同一个信号量.
num_sems
: 需要的信号量数据,总是 1.
sem_flags
: 低端 9 个比特是改信号量的权限,其作用类似与文件的访问权限,和 IPCCREAT 标识与后用于创建信号量.
semop 函数用于改变信号量的值.
sem_id
: 信号量的编号,由函数 semget 返回获得.
sembuf
: 结构 sembuf 至少包含以下成员:
struct sembuf {
short sem_num; // 除非需要使用一组信号量,否则一般为 0
short sem_op; // 通常为两个值 -1 和 1
short sem_flg; // 通常设置为 SEM_UNDO, 它使得当进程美柚释放信号量而终止时,系统会释放此信号量
};
num_sem_ops
: 操作的个数.
semctl 用于控制信号量信息.
semid
: 信号量的编号,由函数 semget 返回获得.
sem_num
: 除非需要使用一组信号量,否则一般为 0
command
: SETVAL 用来把信号量初始化位一个已知值,值通过 union semun 中的 val 成员设置. IPC_RMID 用于删除一个信号量.
...
: 如果有第四个参数,它将会是一个联合体:
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
}
sem1.c
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/sem.h>
#include "semun.h"
static int set_semvalue(void);
static void del_semvalue(void);
static int semaphore_p(void);
static int semaphore_v(void);
static int sem_id;
int main(int argc, char *argv[])
{
int i;
int pause_time;
char op_char = 'O';
srand((unsigned int)getpid());
sem_id = semget((key_t)1234, 1, 0666 | IPC_CREAT);
if(argc > 1){
if(!set_semvalue()){
fprintf(stderr, "Failed to initialize semaphore\n");
exit(EXIT_FAILURE);
}
op_char = 'X';
sleep(2);
}
for(i = 0; i < 10; i++){
if(!semaphore_p())
exit(EXIT_FAILURE);
printf("%c", op_char);
fflush(stdout);
pause_time = rand() % 3;
printf("%c", op_char);
fflush(stdout);
if(!semaphore_v())
exit(EXIT_FAILURE);
pause_time = rand() % 2;
sleep(pause_time);
}
printf("\n%d - finished\n", getpid());
if(argc > 1){
sleep(10);
del_semvalue();
}
}
static int set_semvalue(void)
{
union semun sem_union;
sem_union.val = 1;
if(semctl(sem_id, 0, SETVAL, sem_union) == -1)
return 0;
return 1;
}
static void del_semvalue(void)
{
union semun sem_union;
if(semctl(sem_id, 0, IPC_RMID, sem_union) == -1)
fprintf(stderr, "Failed to delete semaphore\n");
}
static int semaphore_p(void)
{
struct sembuf sem_op;
sem_op.sem_num = 0;
sem_op.sem_op = 1;
sem_op.sem_flg = SEM_UNDO;
if(semop(sem_id, &sem_op, 1) == -1){
fprintf(stderr, "semaphore_p failed\n");
return 0;
}
return 1;
}
static int semaphore_v(void)
{
struct sembuf sem_op;
sem_op.sem_num = 0;
sem_op.sem_op = 1;
sem_op.sem_flg = SEM_UNDO;
if(semop(sem_id, &sem_op, 1) == -1){
fprintf(stderr, "semaphore_v failed\n");
return 0;
}
return 1;
}
semun.h
#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/* union semun is defined by includeing <sys/sem.h>*/
#else
union semun {
int val;
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};
#endif
共享内存是 3 个 IPC 机制中的第二个.共享内存为在多个进程之间共享和传递数据提供了一种有效的方式.由于他并未提供同步机制,所以我们通常需要用其他的机制来同步对共享内存的访问.
#include <sys/shm.h>
void *shmat(int shm_id, const void *shm_addr, int shmflg);
int shmctl(int shm_id, int cmd, struct shmid_ds *buf);
int shmdt(const void *shm_addr);
int shmget(key_t key, size_t size, int shmflg);
shmget 函数用来创建共享内存。
key
: 与信号量一样,提供一个参数 key,返回一个共享内存标识符。
size
: 共享内存的大小。
shmflg
: 低端 9 个比特是改信号量的权限,其作用类似与文件的访问权限,和 IPCCREAT 标识与后用于创建共享内存.
返回共享内存地址。
shm_id
: shmget 返回的标识符。
shm_addr
: 共享内存连接到当前进程中的地址,通常是个空指针。
shmflg
: 通常为两个值 SHM_RND (用来控制共享内存连接的地址,一般不用); SHM_RDONLY (使得连接的内存只读)。
用于共享内存从当前进程中分离,内存并未删除。
用来控制共享内存。
shm_id
: shmget 返回的标识符。
cmd
: 要采取才行动,取值如下:
命令 | 说明 |
---|---|
IPC_STAT | 把 shmid_ds 结构中的数据设置为共享内存的当前关联值 |
IPC_SET | 如果进程有足够的权限,就吧共享内存的当前关联值设置为 shmid_ds 结构的值 |
IPC_RMID | 删除共享内存段 |
buf
: 结构 shmid_ds 至少包括以下成员:
struct shmid_ds {
uid_t shm_perm.uid;
uid_t shm_perm.gid;
mode_t shm_perm.mode;
}
设计一个生产/消费者模型,一个生产者,多个消费者。生产者维护共享内存和信号量。信号量的操作进行一个简单的封装,见文件 sem_api.h 和 sem_imp.c。
sem_api.h
int sem_setvalue(int semid, int val);
void del_semvalue(int semid);
int semaphore_p(int semid);
int semaphore_v(int semid);
sem_imp.c
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <sys/sem.h>
#include "sem_api.h"
#include "semun.h"
int set_semvalue(int semid, int val)
{
union semun sem_union;
sem_union.val = val;
if(semctl(semid, 0, SETVAL, sem_union) == -1)
return 0;
return 1;
}
void del_semvalue(int semid)
{
union semun sem_union;
if(semctl(semid, 0, IPC_RMID, sem_union) == -1)
fprintf(stderr, "Failed to delete semaphore\n");
}
int semaphore_p(int semid)
{
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = -1;
sem_b.sem_flg = SEM_UNDO;
if(semop(semid, &sem_b, 1) == -1){
fprintf(stderr, "semaphore_p fialeld\n");
return 0;
}
return 1;
}
int semaphore_v(int semid)
{
struct sembuf sem_b;
sem_b.sem_num = 0;
sem_b.sem_op = 1;
sem_b.sem_flg = SEM_UNDO;
if(semop(semid, &sem_b, 1) == -1){
fprintf(stderr, "semaphore_p fialeld\n");
return 0;
}
return 1;
}
有的平台可能没有定义 semun 结构体,所以加一个头文件 semun.h
#if defined(__GNU_LIBRARY__) && !defined(_SEM_SEMUN_UNDEFINED)
/* union semun is defined by includeing <sys/sem.h>*/
#else
union semun {
int val;
struct semid_ds *buf;
unsigned short int *array;
struct seminfo *__buf;
};
#endif
一些基本定义 shm_def.h
#define BUF_SIZE 1024
#define SHM_KEY 1234
#define SEM_KEY_GEN 12
#define SEM_KEY_CUS 34
生产者程序 shm_generator.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/shm.h>
#include "shm_def.h"
#include "sem_api.h"
int main(int argc, char *argv[])
{
void *shared_memory = NULL;
char buf[BUF_SIZE];
int shmid;
int semid_gen, semid_cus;
srand((unsigned int)getpid());
shmid = shmget((key_t)SHM_KEY, BUF_SIZE, 0666 | IPC_CREAT);
if(shmid == -1){
fprintf(stderr, "shmget failed\n");
exit(EXIT_FAILURE);
}
shared_memory = shmat(shmid, NULL, 0);
if(shared_memory == (void *)-1){
fprintf(stderr, "shmat fialed\n");
exit(EXIT_FAILURE);
}
printf("Generator -- Memory attach at %X\n", (long)shared_memory);
semid_gen = semget(SEM_KEY_GEN, 1, 0666 | IPC_CREAT);
semid_cus = semget(SEM_KEY_CUS, 1, 0666 | IPC_CREAT);
set_semvalue(semid_cus, 1);
set_semvalue(semid_gen, 0);
while(1){
if(!semaphore_p(semid_cus))
break;
printf("Enter some text:");
fgets(buf, BUF_SIZE, stdin);
strncpy(shared_memory, buf, BUF_SIZE);
if(!semaphore_v(semid_gen))
break;
if(strncmp(buf, "end", 3) == 0)
break;
}
if(shmdt(shared_memory) == -1){
fprintf(stderr, "shmdt failed\n");
exit(EXIT_FAILURE);
}
if(shmctl(shmid, IPC_RMID, 0) == -1){
fprintf(stderr, "remove shared memory failed\n");
exit(EXIT_FAILURE);
}
del_semvalue(semid_gen);
del_semvalue(semid_cus);
exit(EXIT_SUCCESS);
}
消费者程序 shm_customer.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/shm.h>
#include "shm_def.h"
#include "sem_api.h"
int main(int argc, char *argv[])
{
void *shared_memory = NULL;
char buf[BUF_SIZE];
int shmid;
int semid_gen, semid_cus;
srand((unsigned int)getpid());
shmid = shmget((key_t)SHM_KEY, BUF_SIZE, 0666);
if(shmid == -1){
fprintf(stderr, "shmget failed, try run shm_genrator first\n");
exit(EXIT_FAILURE);
}
shared_memory = shmat(shmid, NULL, 0);
if(shared_memory == (void *)-1){
fprintf(stderr, "shmat fialed\n");
exit(EXIT_FAILURE);
}
printf("Memory attach at %X\n", (long)shared_memory);
semid_gen = semget(SEM_KEY_GEN, 1, 0666);
semid_cus = semget(SEM_KEY_CUS, 1, 0666);
while(1){
if(!semaphore_p(semid_gen))
break;
strncpy(buf, shared_memory, BUF_SIZE);
printf("%d -- %s", getpid(), buf);
if(strncmp(buf, "end", 3) == 0)
break;
if(!semaphore_v(semid_cus))
break;
}
}
Makefile
CC=cc
CFLAGS= -pedantic -Wall -g
.c.o:
$(CC) $(CFLAGS) $(DFLAGS) -c $<
sem_imp.o: sem_imp.c sem_api.h
shm_customer.o: shm_customer.c sem_api.h shm_def.h
shm_generator.o: shm_generator.c sem_api.h shm_def.h
shm_customer: sem_imp.o shm_customer.o
$(CC) -o shm_customer $(DFLAGS) sem_imp.o shm_customer.o
shm_generator: sem_imp.o shm_generator.o
$(CC) -o shm_generator $(DFLAGS) sem_imp.o shm_generator.o
clean:
rm -f shm_customer shm_generator *.o *~
先运行 shm_generator 然后运行两个 shm_customer。输入一些文字,以 “end” 结束输入,运行结果如下:
消息队列提供了一种在两个不相关的进程之间传递数据的相当简单切有效的方法,与命名管道相比,消息队列的优势在与,他独立于发送和接收进程而存在, 这消除了同步命名管道打开和关闭时可能产生的一些困难。
与管道一样,每个数据块都有一个最大长度限制,系统中所有队列包含的全部数据块的总长度也有一个上限。linux 系统中有连个宏定义 MSGMAX
和 MSGMNB
, 他们以字节为单位分别定义了一条消息的最大长度和一个队列的最大长度。
#include <sys/msg.h>
int msgctl(int msgid, int cmd, struct msgid_ds *buf);
int msgget(key_t key, int msgflg);
int msgrcv(int mgsid, void *msg_ptr, size_t msg_sz, int msgtype, int msgflg);
int msgsnd(int mgsid, const void *msg_ptr, size_t msg_sz, int msgflg);
用 msgget 函数创建和访问一个消息队列。
key
: 提供一个键值来命名消息队列。
msgflg
: 低端 9 个比特是改信号量的权限,其作用类似与文件的访问权限,和 IPCCREAT 标识与后用于创建信号量。
msgsnd 函数用来把消息添加到消息队列中。
msgid
: msgget 返回的消息队列标识符。
msg_ptr
: 消息结构指针,消息结构大小不能超过上限,并且必须以一个长整型成员变量开始,例如:
struct my_message {
long int message_type;
/* The data you wish to transfer*/
};
msg_sz
: 消息长度,注意这个长度不包括消息类型的长度。
msgflg
: 控制在当前消息队列满或者队列消息到达系统范围的限制时将要发生的事情,如果 msgflg 中设置了 IPC_NOWAIT 标志,函数将 like 返回,不发送消息并且返回值为 -1,如果 msgflg 中的 IPC_NOWAIT 标志被清除,则发送进程将会挂起以等待队列中腾出可用空间。
msgrcv 函数从一个消息队列中获取消息。
msgid
: msgget 返回的消息队列标识符。
msg_ptr
: 一个指向准备接收消息的指针。
msg_sz
: 消息结构的长度,注意这个长度不包括消息类型的长度。
msgtype
: 实现一种简单的接收消息的优先级,如果 msgtpe 的值为 0,就获取队列中的第一个可用的消息,如果他的值大于 0,将获取消息类型值为 msgtype 的第一个消息,如果值小于 0,将获取消息类型等于或小于 msgtype 的绝对值的第一个消息。
msgflg
: 控制当队列中没有相应类型的消息时将要发生的事情,如果 msgflg 中设置了 IPC_NOWAIT 标志,函数将 like 返回,不发送消息并且返回值为 -1,如果 msgflg 中的 IPC_NOWAIT 标志被清除,则进程将会挂起以等待队列中有消息到达。
用来控制消息队列。
msg_id
: msgget 返回的标识符。
cmd
: 要采取才行动,取值如下:
命令 | 说明 |
---|---|
IPC_STAT | 把 msgid_ds 结构中的数据设置为消息队列的当前关联值 |
IPC_SET | 如果进程有足够的权限,就吧消息队列的当前关联值设置为 msgid_ds 结构的值 |
IPC_RMID | 删除消息队列 |
buf
: 结构 msgid_ds 至少包括以下成员:
struct msgid_ds {
uid_t shm_perm.uid;
uid_t shm_perm.gid;
mode_t shm_perm.mode;
}
此例程功能和内存共享中的例程相同,但是不需要要信号量的同步代码。
msg_def.h
#ifndef __MSG_DEF_H__
#define __MSG_DEF_H__
#define MAX_DATA_LEN 512
#define MSG_KEY 1234
struct msg_data_t {
long int type;
char data[MAX_DATA_LEN];
};
#endif
msg_snd.c
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/msg.h>
#include "msg_def.h"
int main(int argc, char *argv[])
{
int msgid;
char buf[MAX_DATA_LEN];
struct msg_data_t msgdata;
msgid = msgget((key_t)MSG_KEY, 0666 | IPC_CREAT);
if(msgid == -1){
fprintf(stderr, "msget failed\n");
exit(EXIT_FAILURE);
}
while(1){
printf("Enter some text:");
fgets(buf, MAX_DATA_LEN, stdin);
msgdata.type = 1;
strncpy(msgdata.data, buf, MAX_DATA_LEN);
if(msgsnd(msgid, (void *)&msgdata, MAX_DATA_LEN, 0) == -1){
fprintf(stderr, "msgsnd failed\n");
exit(EXIT_FAILURE);
}
if(strncmp(buf, "end", 3) == 0)
break;
}
if(msgctl(msgid, IPC_RMID, 0) == -1){
fprintf(stderr, "delete msg failed\n");
exit(EXIT_FAILURE);
}
exit(EXIT_SUCCESS);
}
msg_rcv.c
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/msg.h>
#include "msg_def.h"
int main(int argc, char *argv[])
{
int msgid;
char buf[MAX_DATA_LEN];
struct msg_data_t msgdata;
msgid = msgget((key_t)MSG_KEY, 0666);
if(msgid == -1){
fprintf(stderr, "msgget failed\n");
exit(EXIT_FAILURE);
}
while(1){
if(msgrcv(msgid, (void *)&msgdata, MAX_DATA_LEN, 0, 0) == -1){
fprintf(stderr, "mgrcv failed\n");
break;
}
strncpy(buf, msgdata.data, MAX_DATA_LEN);
printf("%d -- %s", getpid(), buf);
if(strncmp(buf, "end", 3) == 0)
break;
}
}
运行结果: