Chinaunix首页 | 论坛 | 博客
  • 博客访问: 170207
  • 博文数量: 60
  • 博客积分: 15
  • 博客等级: 民兵
  • 技术积分: 638
  • 用 户 组: 普通用户
  • 注册时间: 2010-11-26 10:59
个人简介

喜欢coding,因为那是一件伟大的事情,是将无生命的IC赋予灵魂的过程,让我拥有了和上帝一样的成就感。(w1c2g3@163.com)

文章分类

全部博文(60)

文章存档

2017年(7)

2016年(41)

2015年(1)

2014年(4)

2013年(7)

我的朋友

分类: LINUX

2013-11-01 10:47:29

Linux spinlock

                                                                                               ----based on linux3.11.2

struct spinlock

spin_lock是linux中最简单的锁机制,semaphore、mutex、seqlock都是基于spin_lock来实现。

spin_lock包括两个基本操作spin_lock、spin_unlock,以arm为例,看下其具体实现:
[include/linux/spinlock_types.h]

  1. typedef struct spinlock {
  2.         union {
  3.                 struct raw_spinlock rlock;

  4. #ifdef CONFIG_DEBUG_LOCK_ALLOC
  5. # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
  6.                 struct {
  7.                         u8 __padding[LOCK_PADSIZE];
  8.                         struct lockdep_map dep_map;
  9.                 };
  10. #endif
  11.         };
  12. } spinlock_t;

先不care debug的部分,那么spinlock只有一个成员rlock,其type为raw_spinlock:

[include/linux/spinlock_types.h]
  1. typedef struct raw_spinlock {
  2.         arch_spinlock_t raw_lock;
  3. #ifdef CONFIG_GENERIC_LOCKBREAK
  4.         unsigned int break_lock;
  5. #endif
  6. #ifdef CONFIG_DEBUG_SPINLOCK
  7.         unsigned int magic, owner_cpu;
  8.         void *owner;
  9. #endif
  10. #ifdef CONFIG_DEBUG_LOCK_ALLOC
  11.         struct lockdep_map dep_map;
  12. #endif
  13. } raw_spinlock_t;
同样,raw_spinlock中只有一个成员raw_lock,其type为arch_spinlock_t:
[arch/arm/include/asm/spinlock_types.h]
  1. typedef struct {
  2.         union {
  3.                 u32 slock;
  4.                 struct __raw_tickets {
  5. #ifdef __ARMEB__
  6.                         u16 next;
  7.                         u16 owner;
  8. #else
  9.                         u16 owner;
  10.                         u16 next;
  11. #endif
  12.                 } tickets;
  13.         };
  14. } arch_spinlock_t;
arch_spinlock_t是架构相关的type,其中包括slock和__raw_tickets两个成员。
__ARMEB__代表Big-Endian,这样slock被划分成两部分:
                         u32 slock
----------------------------------------------------------
|          u16 next          |          u16 owner        |     
----------------------------------------------------------
bit31                   bit16 bit15                   bit0
 

spin_lock_init

初始化spinlock
[include/linux/spinlock.h]
  1. #define spin_lock_init(_lock) \
  2. do { \
  3.         spinlock_check(_lock); \
  4.         raw_spin_lock_init(&(_lock)->rlock); \
  5. } while (0)

spinlock_check是用来check参数是否是spinlock_t类型,通过rlock成员变量:
[include/linux/spinlock.h]

  1. static inline raw_spinlock_t *spinlock_check(spinlock_t *lock)
  2. {
  3.         return &lock->rlock;
  4. }
[include/linux/spinlock.h]
  1. # define raw_spin_lock_init(lock)                \
  2.     do { *(lock) = __RAW_SPIN_LOCK_UNLOCKED(lock); } while (0)
  3. #endif
[include/linux/spinlock_types.h]
  1. #define __RAW_SPIN_LOCK_UNLOCKED(lockname)    \
  2.     (raw_spinlock_t) __RAW_SPIN_LOCK_INITIALIZER(lockname)
[include/linux/spinlock_types.h]
  1. #define __RAW_SPIN_LOCK_INITIALIZER(lockname)    \
  2.     {                    \
  3.     .raw_lock = __ARCH_SPIN_LOCK_UNLOCKED,    \
  4.     SPIN_DEBUG_INIT(lockname)        \
  5.     SPIN_DEP_MAP_INIT(lockname) }
[arch/arm/include/asm/spinlock_types.h]
  1. #define __ARCH_SPIN_LOCK_UNLOCKED    { { 0 } }

初始化完成后,slock = tickets.next = tickets.owner = 0
 

spin_lock

[include/linux/spinlock.h]
  1. static inline void spin_lock(spinlock_t *lock)
  2. {
  3.     raw_spin_lock(&lock->rlock);
  4. }
[include/linux/spinlock.h]
  1. #define raw_spin_lock(lock)    _raw_spin_lock(lock)
[kernel/spinlock.c]
  1. void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
  2. {
  3.     __raw_spin_lock(lock);
  4. }
  5. EXPORT_SYMBOL(_raw_spin_lock);
[include/linux/spinlock_api_smp.h]
  1. static inline void __raw_spin_lock(raw_spinlock_t *lock)
  2. {
  3.     preempt_disable();
  4.     spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
  5.     LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
  6. }

call到__raw_spin_lock后,首先会利用preempt_disable关闭调度器的抢占性,原因是避免在当前进程持有lock的情况下,CPU被中断后,会调到执行另一个进程,这样,当另一进程也试图spin_lock,因为spin_lock本身不会sleep,就会产生死锁现象。所以要暂时关闭调度器抢占性。

[include/linux/lockdep.h]
  1. ifdef CONFIG_DEBUG_LOCK_ALLOC
  2. # ifdef CONFIG_PROVE_LOCKING
  3. # define spin_acquire(l, s, t, i)        lock_acquire(l, s, t, 0, 2, NULL, i)
  4. # define spin_acquire_nest(l, s, t, n, i)    lock_acquire(l, s, t, 0, 2, n, i)
  5. # else
  6. # define spin_acquire(l, s, t, i)        lock_acquire(l, s, t, 0, 1, NULL, i)
  7. # define spin_acquire_nest(l, s, t, n, i)    lock_acquire(l, s, t, 0, 1, NULL, i)
  8. # endif
  9. # define spin_release(l, n, i)            lock_release(l, n, i)
  10. #else
  11. # define spin_acquire(l, s, t, i)        do { } while (0)
  12. # define spin_release(l, n, i)            do { } while (0)
  13. #endif

spin_acquire是供debug时使用,正常时定义为空。

[include/linux/lockdep.h]
  1. #define LOCK_CONTENDED(_lock, try, lock)            \
  2. do {                                \
  3.     if (!try(_lock)) {                    \
  4.         lock_contended(&(_lock)->dep_map, _RET_IP_);    \
  5.         lock(_lock);                    \
  6.     }                            \
  7.     lock_acquired(&(_lock)->dep_map, _RET_IP_);            \
  8. } while (0)

  9. #else /* CONFIG_LOCK_STAT */

  10. #define lock_contended(lockdep_map, ip) do {} while (0)
  11. #define lock_acquired(lockdep_map, ip) do {} while (0)

  12. #define LOCK_CONTENDED(_lock, try, lock) \
  13.     lock(_lock)

  14. #endif /* CONFIG_LOCK_STAT */

LOCK_CONTENDED macro会依照CONFIG_LOCK_STAT有两种不同的定义:

Ifdef CONFIG_LOCK_STAT

首先会利用do_raw_spin_trylock来尝试lock一次,如果没有拿到的话,再调用do_raw_spin_lock来进行加lock。

Ifndef CONFIG_LOCK_STAT

            直接调用do_raw_spin_lock来进行加lock。

在目前arm config里,没有定义CONFIG_LOCK_STAT,所以,我们直接跳到do_raw_spin_lock。

[include/linux/spinlock.h]
  1. static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
  2. {
  3.     __acquire(lock);
  4.     arch_spin_lock(&lock->raw_lock);
  5. }
[arch/arm/include/asm/spinlock.h]
  1. static inline void arch_spin_lock(arch_spinlock_t *lock)
  2. {
  3.     unsigned long tmp;
  4.     u32 newval;
  5.     arch_spinlock_t lockval;

  6.     __asm__ __volatile__(
  7. "1:    ldrex    %0, [%3]\n"
  8. "    add    %1, %0, %4\n"
  9. "    strex    %2, %1, [%3]\n"
  10. "    teq    %2, #0\n"
  11. "    bne    1b"
  12.     : "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
  13.     : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
  14.     : "cc");

  15.     while (lockval.tickets.next != lockval.tickets.owner) {
  16.         wfe();
  17.         lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);
  18.     }

  19.     smp_mb();
  20. }

arch_spin_lock的核心部分是通过内嵌汇编的方式完成的,因为要保证指令的原子性。其中,

%0 -> lockval

%1 -> newval

%2 -> tmp

%3 -> &lock->slock

%4 -> 1 << TICKET_SHIFT = (1<<16)

line 8:lockval = *lock

line 9:newval = lockval->slock + (1<<16)

line 10:这是整个函数的重点,若是exclusive access 则将tmp 设为 0 并且把newval存储到lock->slock,若是 non-exclusive access(有其他 CPU 来动过) 则 tmp设为 1并且不做存储lock->slock的动作。

line 11:判断tmp是否为0,置cpsr->Z

line 12:如果cpsr->Z不为0,即tmp != 0,jump to tag 1

在成功update lock->slock之后,因为union的关系,lock->tickets.next会增加1。因为此时lockval.tickets.next = lockval.tickets.owner = 0,所以没有走进while。

smp_mb仅仅用于保证SMP 的memory读写顺序。
 

spin_unlock

[include/linux/spinlock.h]
  1. static inline void spin_unlock(spinlock_t *lock)
  2. {
  3.     raw_spin_unlock(&lock->rlock);
  4. }
[include/linux/spinlock.h]
  1. #define raw_spin_unlock(lock)        _raw_spin_unlock(lock)
[kernel/spinlock.c]
  1. void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
  2. {
  3.     __raw_spin_unlock(lock);
  4. }
  5. EXPORT_SYMBOL(_raw_spin_unlock);
  6. #endif
[include/linux/spinlock_api_smp.h]
  1. static inline void __raw_spin_unlock(raw_spinlock_t *lock)
  2. {
  3.     spin_release(&lock->dep_map, 1, _RET_IP_);
  4.     do_raw_spin_unlock(lock);
  5.     preempt_enable();
  6. }

spin_release用于debug,暂时不care。preempt_enable是重新打开调度器的抢占特性。

真正unlock的是do_raw_spin_unlock:

[include/linux/spinlock.h]
  1. static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
  2. {
  3.     arch_spin_unlock(&lock->raw_lock);
  4.     __release(lock);
  5. }
[arch/arm/include/asm/spinlock.h]
  1. static inline void arch_spin_unlock(arch_spinlock_t *lock)
  2. {
  3.     smp_mb();
  4.     lock->tickets.owner++;
  5.     dsb_sev();
  6. }

arch_spin_unlock里仅仅将lock->tickets.owner+1。

dsb_sev中也会内嵌汇编实现:

[arch/arm/include/asm/spinlock.h]
  1. static inline void dsb_sev(void)
  2. {
  3. #if __LINUX_ARM_ARCH__ >= 7
  4.     __asm__ __volatile__ (
  5.         "dsb\n"
  6.         SEV
  7.     );
  8. #else
  9.     __asm__ __volatile__ (
  10.         "mcr p15, 0, %0, c7, c10, 4\n"
  11.         SEV
  12.         : : "r" (0)
  13.     );
  14. #endif
  15. }

对于armv7架构,是调用dsb命令,保证它执行完毕后,才会执行程序中位于此指令后的指令。

可以看出,如果有其他的task再次调用spin_lock,就会因为lock时的lockval.tickets.next++,

从而,lockval.tickets.next != lockval.tickets.owner一直在loop中。直到持锁的task调用spin_unlock,lockval.tickets.owner++后,才会跳出loop。

这个机制是在Linux-2.6.25中加入内核,Linux-3.6中加入ARM架构的,也就是接下来介绍的,大名鼎鼎的 Ticket spinlocks
 

spin_trylock

Kernel中还提供了一种无等待的lock api,称为spin_trylock:

[include/linux/spinlock.h]
  1. static inline int spin_trylock(spinlock_t *lock)
  2. {
  3.     return raw_spin_trylock(&lock->rlock);
  4. }
[include/linux/spinlock.h]
  1. #define raw_spin_trylock(lock)    __cond_lock(lock, _raw_spin_trylock(lock))
[kernel/spinlock.c]
  1. int __lockfunc _raw_spin_trylock(raw_spinlock_t *lock)
  2. {
  3.     return __raw_spin_trylock(lock);
  4. }
  5. EXPORT_SYMBOL(_raw_spin_trylock);
[include/linux/spinlock_api_smp.h]
  1. static inline int __raw_spin_trylock(raw_spinlock_t *lock)
  2. {
  3.     preempt_disable();
  4.     if (do_raw_spin_trylock(lock)) {
  5.         spin_acquire(&lock->dep_map, 0, 1, _RET_IP_);
  6.         return 1;
  7.     }
  8.     preempt_enable();
  9.     return 0;
  10. }

__raw_spin_trylock会在禁止调度器抢占的条件下运行,调用do_raw_spin_trylock来获取spinlock:

[include/linux/spinlock.h]
  1. static inline int do_raw_spin_trylock(raw_spinlock_t *lock)
  2. {
  3.     return arch_spin_trylock(&(lock)->raw_lock);
  4. }
[arch/arm/include/asm/spinlock.h]
  1. static inline int arch_spin_trylock(arch_spinlock_t *lock)
  2. {
  3.     unsigned long contended, res;
  4.     u32 slock;

  5.     do {
  6.         __asm__ __volatile__(
  7.         "    ldrex    %0, [%3]\n"
  8.         "    mov    %2, #0\n"
  9.         "    subs    %1, %0, %0, ror #16\n"
  10.         "    addeq    %0, %0, %4\n"
  11.         "    strexeq    %2, %0, [%3]"
  12.         : "=&r" (slock), "=&r" (contended), "=&r" (res)
  13.         : "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
  14.         : "cc");
  15.     } while (res);

  16.     if (!contended) {
  17.         smp_mb();
  18.         return 1;
  19.     } else {
  20.         return 0;
  21.     }
  22. }

同样,会调到汇编来实现原子操作,变量标号:

%0 -> slock

%1 -> contended

%2 -> res

%3 -> &lock->slock

%4 -> 1 << TICKET_SHIFT = (1<<16)

line 8:slock = lock->slock

line 9:res = 0

line 10:contended = lockval.tickets.owner - lockval.tickets.next

line 11: if(contended == 0) slock.tickets.next++

line 12: 若是exclusive access 则将res 设为 0 并且把局部变量slock存储到lock->slock,若是 non-exclusive access(有其他 CPU 来动过) 则res设为 1并且不做存储lock->slock的动作。

line 16: 若是 non-exclusive access,do these again

如果成功lock返回1,失败返回0。
 

Ticket spinlocks

在arm架构,linux-3.6之前版本的kernel中,spinlock是通过一个整型数值来表示。值0表示lock可用。spin_lock通过原子方式设置这个数值为1,表示lock已被占用,如果有其他cpu想要再次lock,会一直循环check这个数值。当spin_unlock时,会将数值置0。假设有多个CPU在等待lock,并不能保证等待最久的CPU可以最先拿到lock。这就是传统spin_lock的“不公平”性。

传统自旋锁的“不公平”问题在锁竞争激烈的服务器系统中尤为严重,因此 Linux 内核开发者 Nick Piggin 在 Linux 内核 中引入了排队自旋锁:通过保存执行线程申请锁的顺序信息来解决“不公平”问题。

排队自旋锁仍然使用原有的 raw_spinlock_t 数据结构,但是赋予 slock 域新的含义。为了保存顺序信息,slock 域被分成两部分,分别保存锁持有者和未来锁申请者的票据序号(Ticket Number),如下图所示:

                         u32 slock
----------------------------------------------------------
|          u16 next          |          u16 owner        |     
----------------------------------------------------------
bit31                   bit16 bit15                   bit0

owner 和 next 域均为 16 位,其中 owner 域为 slock 的低 16 位。可见排队自旋锁最多支持 2^16=65536 个处理器。

只有 next 域与 owner 域相等时,才表明锁处于未使用状态(此时也无人申请该锁)。排队自旋锁初始化时 slock 被置为 0,即 owner 和 next 置为0。内核执行线程申请自旋锁时,原子地将 next 域加 1,并将原值返回作为自己的票据序号。如果返回的票据序号等于申请时的 owner 值,说明自旋锁处于未使用状态,则直接获得锁;否则,该线程忙等待检查 owner 域是否等于自己持有的票据序号,一旦相等,则表明锁轮到自己获取。线程释放锁时,原子地将 owner 域加 1 即可,下一个线程将会发现这一变化,从忙等待状态中退出。线程将严格地按照申请顺序依次获取排队自旋锁,从而完全解决了“不公平”问题。

 
 
 

Recefrence:

[1] 《深入Linux设备驱动程序内核机制》

[2] 《Linux Kernel Development》3rd Edition

[3]

[4] http://www.ibm.com/developerworks/cn/linux/l-cn-spinlock/





阅读(2708) | 评论(1) | 转发(0) |
0

上一篇:wait queue

下一篇:sort

给主人留下些什么吧!~~

coldsnow32013-11-01 12:31:08

driver说:我平凡得不能再平凡,我相信万事万物皆有生命,我的存在让世界看清了IC的灵动;嗯,这就是我存在的意义。