“以无锁为有锁,以无限为有限,此乃并发编程的最高境界”,无锁就是lock free,某些线程可以有限步执行完,尽管任何一个线程都可能饥饿。有限就是说wait free任何线程都在有限步内完成,不会出现饥饿的情况。wait free 是多核编程的顶峰,大多数wait free 都是java平台,其实微软也提供了部分lock free。
这是windows并发编程的一段代码
#include "stdafx.h"
#include
#include
#include
using namespace std;
template
struct DataItem {
SLIST_ENTRY m_listEntry; //必须是第一个
T m_value ;
};
int _tmain(int argc, _TCHAR* argv[])
{
// Declare and initialize the list head.
SLIST_HEADER listHead;
InitializeSListHead(&listHead);
// Push 1e items onto the stack.
for(int i = 0; i<10;i++)
{
DataItem * d = (DataItem *)malloc(sizeof( DataItem ));
d->m_value = i;
cout << d->m_value << endl;
InterlockedPushEntrySList(&listHead, &d->m_listEntry);
}
// Pop 5 items off the stack.
for (int i = 0; i<5; i++)
{
DataItem * d = (DataItem *)
InterlockedPopEntrySList(&listHead);
// assert(d && d->m_value == (10 - i -1 )) ;
cout << d->m_value << endl;
free( d) ;
}
// Now flush the rema1n1ng contents of the list.
DataItem * d = (DataItem *)InterlockedFlushSList(&listHead);
while (d)
{
DataItem * next = (DataItem *)d->m_listEntry.Next;
assert(d);
cout << d->m_value < free(d);
d = next;
}
// We expect the list is empty by now.
assert(InterlockedPopEntrySList(&listHead) == NULL);
getchar();
return 0;
}
稍加封装
/* Interlocked SList queue using keyed event signaling */
struct queue {
SLIST_HEADER slist;
// Note: Multiple queues can (and should) share a keyed event handle
HANDLE keyed_event;
// Initial value: 0
// Prior to blocking, the queue_pop function increments this to 1, then
// rechecks the queue. If it finds an item, it attempts to compxchg back to
// 0; if this fails, then it's racing with a push, and has to block
LONG block_flag;
};
void init_queue(queue *qPtr) {
NtCreateKeyedEvent(&qPtr->keyed_event, -1, NULL, 0);
InitializeSListHead(&qPtr->slist);
qPtr->blocking = 0;
}
void queue_push(queue *qPtr, SLIST_ENTRY *entry) {
InterlockedPushEntrySList(&qPtr->slist, entry);
// Transition block flag 1 -> 0. If this succeeds (block flag was 1), we
// have committed to a keyed-event handshake
LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
if (oldv) {
NtReleaseKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
}
}
SLIST_ENTRY *queue_pop(queue *qPtr) {
SLIST_ENTRY *entry = InterlockedPopEntrySList(&qPtr->slist);
if (entry)
return entry; // fast path
// Transition block flag 0 -> 1. We must recheck the queue after this point
// in case we race with queue_push; however since ReleaseKeyedEvent
// blocks until it is matched up with a wait, we must perform the wait if
// queue_push sees us
LONG oldv = InterlockedCompareExchange(&qPtr->block_flag, 1, 0);
assert(oldv == 0);
entry = InterlockedPopEntrySList(&qPtr->slist);
if (entry) {
// Try to abort
oldv = InterlockedCompareExchange(&qPtr->block_flag, 0, 1);
if (oldv == 1)
return entry; // nobody saw us, we can just exit with the value
}
// Either we don't have an entry, or we are forced to wait because
// queue_push saw our block flag. So do the wait
NtWaitForKeyedEvent(qPtr->keyed_event, (PVOID)qPtr, FALSE, NULL);
// block_flag has been reset by queue_push
if (!entry)
entry = InterlockedPopEntrySList(&qPtr->slist);
assert(entry);
return entry;
}
注意 这段代码使用了未公开的windows api,很遗憾的是 ,实现无锁的queue不是很难的事情,但是微软没有实现。
很难想象windows vista 之前的线程同步都是用内核对象,微软推荐的CriticalSection也是使用内核对象实现的,而且应该放入try/catch中,但是在low memory如果出现了异常,CRITICAL_SECTION对象将无法确定状态,也无法使用,而Keyed Events解决了这个问题。
阅读(5034) | 评论(0) | 转发(1) |