最近更新: 2011-01-03

如何以 semaphore 進行資源的共用互斥鎖定

在 Unix 系統中,資源的共用機制,通常以檔案鎖最為常見,也最為容易使用,只要使用 flock() 即可對檔案進行 LOCK_SH (共享鎖定)、 LOCK_EX (互斥鎖定)。但是其他的資源就沒這麼方便,例如共享記憶體完全沒有提供鎖定功能,此時就必須借助其他的行程間通訊機制進行協調,例如「號誌」(semaphore)。

本文即在說明,如何以號誌實作一個如 flock() 具有 LOCK_SH, LOCK_EX, LOCK_UN, LOCK_NB 四種參數的「資源共用互斥鎖定函數」。

這是一篇我寫的舊文章。因為前陣子要對其他人說明共用與排他的資源鎖定觀念,所以又翻了出來,重新排版。

關於 semaphore 的中文譯名,我當初(1999年)是依《新洞悉UNIX系列叢書 - 系統程式設計篇》的用法。它將 semaphore 譯為「信號」,而 signal 譯為「訊號」。不過這兩個名詞實在很容易混淆,故我現在改用「號誌」稱呼 semaphore 。

實作說明

我將運用 semget(), semctl(), semop() 實作一個 lock_shm() 的資源共用互斥鎖定函數。它的行為模仿 flock(),定義了下列四個上鎖行為。

  • LOCK_SH - 加上共享鎖定(讀取鎖定)
  • LOCK_EX - 加上排他鎖定(寫入鎖定)
  • LOCK_NB - 不擱置( non-blocking ,配合前兩者使用 )
  • LOCK_UN - 解除鎖定

此函數提供了一個近似 flock() 行為的鎖定函數。lock_shm() 並不是直接對共享記憶體上鎖。事實上 Unix 並沒有鎖定共享記憶體的動作,所以我是用號誌的互斥動作對共享記憶體上鎖。換言之,lock_shm() 不只是個「共享記憶體鎖」,更是一個「共用互斥資源鎖」。「共用互斥」之英文為: Mutual exclusion - 可共用、也可排他。可以很多人一起使用;也可排除他人只給自已用。

BSD 家族使用 mmap() 將檔案內容映射到記憶體,藉此達成行程間共用記憶體的目的。由於此機制是透過檔案描述子 (fd) 實現,所以可以對檔案描述子上鎖。從這些 IPC 機制間的差異,可以略約窺探到 SVR 家族與 BSD 家族之設計理念的差異。
上鎖流程示意圖
上鎖流程示意圖

上圖假設有 1 、 2 、 3 三個行程打算上鎖的四個階段。 其中被紅色方框圍住的動作,表示被擱置了。 擱置動作是由作業系統處理的。

  1. 行程 1 想加上共享鎖(SHLOCK)。
    1. 行程 1 先鎖住 EXLOCK ,再加上 SHLOCK 。
    2. 行程 2 想鎖住 EXLOCK ,但被擱置。
    3. 行程 3 想鎖住 EXLOCK ,但被擱置。
  2. 行程 1 完成上鎖動作。
    1. 行程 1 加上 SHLOCK 後,解開 EXLOCK ,完成上鎖動作。
    2. 行程 2 鎖住 EXLOCK ,但被擱置,等 行程 1 之前加上的 SHLOCK 解開。
    3. 行程 3 依然被擱置在等待鎖住 EXLOCK 的地方。
  3. 行程 1 解鎖,行程 2 結束等待,完成上鎖動作。
    1. 行程 1 解開 SHLOCK 。
    2. 行程 2 已無 SHLOCK , 行程 2 結束擱置狀態,完成上鎖動作。
    3. 行程 3 依然被擱置在等待鎖住 EXLOCK 的地方。
  4. 行程 2 解鎖,行程 3 結束等待,完成上鎖。
    1. 行程 2 解開 EXLOCK 。
    2. 行程 3 鎖住 EXLOCK ,再加上 SHLOCK ,最後解開 EXLOCK ,完成上鎖動作。
實作方式

因為同時要應付 LOCK_EX 及 LOCK_SH 兩種情形,故需要兩個號誌為一組。 一個號誌控制排他鎖(EXLOCK - exclusion lock),另一個號誌控制共用鎖(SHLOCK - shared lock)。

因為排他鎖的特性是,一次只能用一個,故此號誌的初值為 1。 而共用鎖則是可以同時好幾個使用者共用,故此號誌的初值設為 0 。

還有一點要注意的是,排他鎖與共用鎖不會同時存在,當共用鎖被鎖上時,想鎖上排他鎖的動作就會被擱置,等到共用鎖全部都解除之後,才能鎖上排他鎖。同樣地,後來的共用鎖上鎖動作,又必須等待前一個排隊中的排他鎖上鎖動作解除後才可以鎖上。 亦即後來的排他鎖必須等前面的共用鎖全部解除;後來的共用鎖必須等前面的排他鎖解除。

如何鎖上共用鎖
  1. 鎖上排他鎖: EXLOCK.sem_op -1 。
    等待先前的排他鎖解除,並防止後來的上鎖動作插隊。當 EXLOCK 等於 0 時,試圖將其減一的動作,就會被系統擱置,直到 EXLOCK 的值大於等於 -1 的絕對值。
  2. 鎖上共用鎖: SHLOCK.sem_op +1 。
    因為共用鎖可以同時鎖上多個,所以上鎖動作是用加的。 SHLOCK 的值,會等於目前取得共用權利的程序的數目。
  3. 解除排他鎖: EXLOCK.sem_op +1 。
    讓後來的上鎖動作可以繼續動作。 但是後到的 LOCK_EX 將會等待 SHLOCK 的值變為 0。而在它等待的時間裡,後到的 LOCK_SH 將會等待它解除 EXLOCK
如何鎖上排他鎖
  1. 鎖上排他鎖: EXLOCK.sem_op -1 。
    如果 EXLOCK 等於 1 ,則減一的動作會使 EXLOCK 變為 0 (1 + -1 = 0)。如果 EXLOCK 等於 0 ,則試圖減一的動作將會被擱置。
  2. 等待共用鎖全部解除: SHLOCK.sem_op 等於 0。
    等待 SHLOCK 的值變為 0 ,亦即所有的共用鎖都已解除。 我第一步都先鎖上排他鎖,就是為了防止後來的上鎖動作插隊。
如何解除鎖定

這講起來比較麻煩,我就懶得細述了,看個人理解能力吧。

  1. if ( EXLOCK =1 and SHLOCK > 0 ) then SHLOCK -1 (解除 SHLOCK)
  2. if ( EXLOCK =0 and has process wait SHLOCK be zero) then SHLOCK -1 (解除 SHLOCK)
  3. if ( EXLOCK =0 but no process wait SHLOCK be zero) then EXLOCK +1 (解除 EXLOCK)

實作內容

程式是我從我的一個舊軟體 (Firebird BBS library) 中的 bcache.c 中擷取出來的。bcache.c 是一個共享記憶體使用模組,並利用號誌來實作其中的 lock_shm() 功能。舊文的內容已經不能編譯了,故我這次改版時順便改寫了程式碼,並在 Ubuntu 10.04 下編譯實測過。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/* Firebird BBS, The TIP Project. 資料快取模組: bcache.c */
#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <sys/types.h>

#include <sys/stat.h>

#include <unistd.h>

#include <sys/file.h>

#include <sys/ipc.h>

//#include "bbsdefs.h"

//#include "config.h"

//#include "struct.h"

#include <sys/sem.h>


#if !defined(LOCK_SH)
    #define LOCK_SH		1	/* shared lock */
#endif
#if !defined(LOCK_EX)
    #define LOCK_EX		2	/* exclusive lock */
#endif
#if !defined(LOCK_NB)
    #define LOCK_NB		4	/* don't block when locking */
#endif
#if !defined(LOCK_UN)
    #define LOCK_UN		8	/* unlock */
#endif

/* write by rock
  求 IPC 鍵值。
  須傳入一檔案名稱,若該檔不存在, IPC_key() 會自動建立。

  RC: 成功(鍵值)、失敗(-1)
  See also: stat(), ftok()
*/
static int IPC_key(const char*ipcname) {
    struct stat st;
    int fd;
    if( stat(ipcname, &st) == 0 && S_ISREG( st.st_mode) )
        return st.st_ino;
    if((fd=creat(ipcname, 0644))<0)
        return -1;
    fstat(fd, &st);
    close(fd);
    return st.st_ino; // 以 i_node 的值做為 IPC key

    //不理會 SYSV 的 ftok() 函數。

    // ftok() 是 SVR 所提供的函數,在 POSIX 及 BSD 中沒有。

    //return ftok(ipcname,proj);

}

/*
  NAME: lock_shm() 鎖定共享記憶體。

    LOCK_SH 加上共享鎖定(讀取鎖定)
    LOCK_EX 加上互斥鎖定(寫入鎖定)
    LOCK_NB 不擱置( non-blocking ,配合前兩者使用 )
    LOCK_UN 解除鎖定

  此函數提供了一個近似 flock() 行為的 shm 鎖定函數。

  RC: 成功(0)、失敗(-1)
  See also: semget(), semctl(),  semop()
*/
int lock_shm(const char*shmname,int op) {
    enum { EXLOCK,  SHLOCK} lock;
    struct sembuf lockop={0, 0, SEM_UNDO} /* sem 操作指令*/;
    int semkey,  semid;
    ushort sem_val[] = {
        1 /*Init value of EXLOCK*/,
        0 /*Init value of SHLOCK*/ };

    if( (semkey=IPC_key(shmname)) < 0) //嘗試建立 sem

        return -1;

    if( (semid=semget(semkey,  2, IPC_CREAT|IPC_EXCL|0640)) >= 0) {
        if( semctl(semid, 0, SETALL, sem_val) < 0)	// 初始 sem 的值

            return -1;
    }
    else {
        if((semid=semget(semkey,1,0)) < 0) //sem 可能建立了,取得semid

            return -1;
    }

    /*{
    ushort sval[2];
    semctl(semid,0,GETALL,sval);
    printf("[%d] Value  of EXLOCK:%d, Value of SHLOCK:%d\n",
        semid, sval[EXLOCK], sval[SHLOCK]);
    }*/

    /*
    non-blocking 鎖定,只要多設一個旗標 IPC_NOWAIT 即達目的。
    連傳回值也一樣:當無法立即取得對 sem 的操作時, semop() 不會被擱置,
    會立即返回(傳回 -1 ),並設定 errno 為 EAGAIN 。
    */
    if( op & LOCK_NB )
        lockop.sem_flg |= IPC_NOWAIT;

    if( op & LOCK_EX ) {
        lockop.sem_num = EXLOCK;
        lockop.sem_op = -1;

        if( semop(semid,  &lockop, 1) < 0 )
            return -1;
        //printf("EXLOCK... wait SHLOCK be zero\n");

        lockop.sem_num  = SHLOCK;
        lockop.sem_op = 0;
        return semop(semid, &lockop, 1);
    }
    else if( op &  LOCK_SH ) {
        lockop.sem_num = EXLOCK;
        lockop.sem_op = -1;
        if( semop(semid, &lockop,  1) < 0 )
            return -1;
        //printf("EXLOCK... wait to get SHLOCK\n");

        lockop.sem_num  = SHLOCK;
        lockop.sem_op = 1;
        if( semop(semid, &lockop, 1) < 0 )
            return -1;
        //printf("SHLOCK  ok\n");

        lockop.sem_num = EXLOCK;
        lockop.sem_op = 1;
        //printf("Release EXLOCK\n");

        return semop(semid, &lockop, 1);
    }

    //以下動作皆為 LOCK_UN

    if( semctl(semid, 0, GETALL,  sem_val) < 0 )
        return -1;
    //printf("LOCK_UN, EXLOCK:%d, SHLOCK:%d\n",sem_val[0],  sem_val[1]);


    /*
    如果沒有程序在等待取得鎖定的話,那解除鎖定的動作,將會是"刪除 sem ",而不
    是"改變 sem 的值"。
    如果有程序等待取得 LOCK_EX 的話,那就是在等待 SHLOCK 的值變為 0 。
    如果有程序等待取得 LOCK_SH 的話,那就是在等待 EXLOCK 的值增加。
    */
    if( semctl(semid, SHLOCK, GETZCNT, 0) == 0 &&
        semctl(semid, EXLOCK, GETNCNT, 0)  == 0 )
    {
        //printf("may remove...");

        if( sem_val[EXLOCK] <= 1 && sem_val[SHLOCK]  == 0) {
            /*printf("remove sem %d,Z:%d,N:%d\n", semid,
              semctl(semid,SHLOCK,GETZCNT,0),
              semctl(semid,EXLOCK,GETNCNT,0));
            */
           return semctl(semid, 0, IPC_RMID, 0);
        }
    }

    if( sem_val[EXLOCK] >0 && sem_val[SHLOCK]  > 0 ) {
        lockop.sem_num = SHLOCK;
        lockop.sem_op = -1;
        //printf("EXLOCK >0 and SHLOCK  >0: UNLOCK SHLOCK\n");

        return semop(semid, &lockop, 1);
    }
    else if( sem_val[EXLOCK]  <= 0 ) {
        if( semctl(semid, SHLOCK, GETZCNT, NULL) >0 ) {
            lockop.sem_num = SHLOCK;
            lockop.sem_op = -1;
            //printf("UNLOCK SHLOCK\n");

        }
        else {
            lockop.sem_num = EXLOCK;
            lockop.sem_op = 1;
            //printf("UNLOCK EXLOCK\n");

        }
        return semop(semid, &lockop,  1);
    }
    return 0;
}

因為實作上的技術問題,一個設計不良的解鎖動作(LOCK_UN),可能會破壞掉目前正在等待取得鎖定的程序隊伍。 例如: 行程a 尚未鎖上任何排他鎖或共用鎖,卻要求解鎖。此時會誤將行程b 加上的鎖給解除了。如果此時有另一行程c 正等待行程b 解除鎖定,就會誤以為行為b 已經解除鎖定(實際上是被行程a 越過界給解除了),開始存取資源,造成行程b 和行程c 互搶資源。

這個問題在技術上並不難克服。只是我懶得做了。

以下是測試程式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <stdio.h>

#include <stdlib.h>

#include <string.h>


#if !defined(LOCK_SH)
    #define LOCK_SH		1	/* shared lock */
#endif
#if !defined(LOCK_EX)
    #define LOCK_EX		2	/* exclusive lock */
#endif
#if !defined(LOCK_NB)
    #define LOCK_NB		4	/* don't block when locking */
#endif
#if !defined(LOCK_UN)
    #define LOCK_UN		8	/* unlock */
#endif
int lock_shm(const char*shmname,int op);

void dump(char *prog) {
    char buf[80];
    sprintf(buf,"dump[%s].",prog);
    write(1, buf, strlen(buf));
    sleep(1);
}

void main(int argc, char* argv[]) {
    char  shmname[] = "TEST_BCACHE";
    int i;
    char *prog;
    prog= argv[1];
    printf("[%s]LOCK_EX:%d\n",  prog, lock_shm(shmname,LOCK_EX) );
    for(i = 0; i < 5; i++)
        dump(prog);
    printf("\n[%s]LOCK_UN:%d\n",  prog, lock_shm(shmname,LOCK_UN) );
    printf("[%s]LOCK_SH:%d\n", prog, lock_shm(shmname,LOCK_SH)  );
    for(i =0; i <5; i++)
        dump(prog);
    printf("\n[%s]LOCK_UN:%d\n", prog, lock_shm(shmname,LOCK_UN)  );
    printf("[%s]LOCK_EX:%d\n", prog, lock_shm(shmname,LOCK_EX) );
    for(i =0; i <5;  i++)
        dump(prog);
    printf("\n[%s]LOCK_UN:%d\n", prog, lock_shm(shmname,LOCK_UN)  );
}

執行範例:


$ gcc -o testlock testlock.c lock_shm.c
$ ./testlock a & ./testlock b &
[1] 3285
[2] 3286
$ [a]LOCK_EX:0
dump[a].dump[a].dump[a].dump[a].dump[a].
[a]LOCK_UN:0
[b]LOCK_EX:0
dump[b].dump[b].dump[b].dump[b].dump[b].
[b]LOCK_UN:0
[a]LOCK_SH:0
dump[a].[b]LOCK_SH:0
dump[b].dump[a].dump[b].dump[a].dump[b].dump[a].dump[b].dump[a].dump[b].
[a]LOCK_UN:0

[b]LOCK_UN:0
[a]LOCK_EX:0
dump[a].dump[a].dump[a].dump[a].dump[a].
[a]LOCK_UN:0
[b]LOCK_EX:0
dump[b].dump[b].dump[b].dump[b].dump[b].
[b]LOCK_UN:0

[1]-  Done                    ./testlock a
[2]+  Done                    ./testlock b

先跑幾隻丟到背景,等執行一部份後,再跑幾隻到背景,就可以觀察數隻程式利用號誌協調執行順序的情形了。以前面的執行範例為例,行程a 先取得了排他鎖,所以一開始只會看到 dump[a] 。接著換行程b 取得排他鎖,所以接著只會看到 dump[b] 。再接著,行程a 與 b 取得共用鎖,所以 dump[a], dump[b] 將交錯出現。

參考文件

  • 新洞悉UNIX系列叢書 - 系統程式設計篇, 劉祖亮著,和碩科技出版
    第 414 到 419 頁。
  • semget()
  • semctl()
  • semop()

這三個函式,我在 Solaris, Linux 和 FreeBSD 都使用過。都被納入 POSIX 規格之中,具有可攜性。

樂多舊網址: http://blog.roodo.com/rocksaying/archives/14848695.html