基于CAS(compare-and-swap)操作的无锁算法,几乎都可以采用《》来转化为无等待算法,从而获得更强的progress。
相对于《》中的算法所采用的类似无等待队列,本文换成一种基于getAndAdd+CAS的无等待队列。本文我用自己的方式来介绍下:- 如何从一个基于CAS的正确(一致性)的无锁算法转化为无等待算法,针对CAS操作建模(无等待算法)
- 同时基于环形的线程专属数据环,由于依赖于具体的数据偏移,降低了公平性。所以使用一种更快速的类似wait-free队列,从而获得更好的公平性。
几乎所有的无锁算法(先不考虑垃圾回收),对于一个线程而言,都是一个如下的过程:
要么成功(本操作make progress)、要么失败(其他线程的操作make progress)从而本操作需要更新数据(preCasOp),再次重复直到成功。并且最后要执行必要的postCasOp。
那么,对于一个value,我们将它置于一个record里:
struct Record { void* value;};struct Record* Head;
算法的关键在于,对一个head的原子改变:
lock-free-method(void* value) { for(;;) { before; Record* head = Head; Record* myRecord = Generate(head, value); if(CAS(&Head, head, myRecord)) { after; return; } }}
这里的问题在于,对于一个操作lock-free-method(val),它在这一行CAS(&Head, head, myRecord),我们需要在这一行取得成功才能算本操作make progress。但是我们无法确定什么时候能够取得成功。在操作系统内核的调度算法越来越好的情况下,我们很可能绝大多数情况下不会感受到某次操作的延迟。但是有没有办法能够确保,本操作能够在有限步之后完成。这种算法性质就叫做wait-freedom。
我们转化成的wait-free算法具有如下结构:- 每个线程都先在fast-path中尝试MAX-fast次,假如某次成功则返回,假如成功的次数达到ENOUTH-fast(per thread),那么就通过shareMemory协助处于slow-path中的操作一次。
- 假如fast-path中尝试MAX-fast次仍未成功,则构造自己的请求信息放到shareMemroy,接着进入slow-path。
- 进入slow-path的线程,不断地尝试,最终会在有限次尝试内,自己完成操作,或者由其他线程帮助完成。
-
协助过程,可以抽象出取得一致性的三个接口:
HELP(handle) HELP_TRANSFER(head) HELP_TRANSFER_VALUE(head, handle)
对于如上这个具体操作,我们可以做如下操作:
首先,给Record结构增加一个Handle_t数据类型:typedef struct Record { void* value; Handle_t* handle;} Record;typedef struct Handle_t { void* value; struct Wrapper* wrapper;} Handle_t;typedef struct Wrapper { bool isFinish; Record* record;} Wrapper;
我们把lock-free-method进行改造:
- 原来的lock-free-method作为fast-path,给定两个阈值,MAX1:让它执行够次数就退出。构造原来的请求,放入共享内存中(可以是并发队列)。 MAX2: 在fast-path中执行成功的次数,从而主动help slow-path中的请求
- CAS操作不再像原来一样在在value的值上改变,而是根据record->handle是否为空来继续操作:为空则尝试自己的请求,不为空则尝试帮助handle对应的请求make progress。(同理,fast-path也进行修改。)
- handle中会有个指向Record的指针,以及代表本操作是否完成的bit和version。
所以,slow-path通过引入两个中间状态(states),把本来一个CAS操作变为了三个CAS操作:
- STEP 1: (exclusive)线程A尝试把原来的Record,指向请求所对应的Handle,make progress(state 1,该Record被某个线程锚定).
- STEP 2: (consisent)某个线程B(B为任意线程)从Record中取出Handle,并且将该Handle指向这个Record,同时设置finish bit,从而告知该次请求完成,make progress.(state 2,说明此事操作已经完成)
- STEP 3: 线程A从自己的handle中获知操作已完成(finish bit,stete 2导致的),并且获取自己锚定的Record,更新Record的值,从而完成收尾工作,并且返回自己的操作结果(state 3,此时Record的值又回到了原来的状态,与lock-free中的结果一致)
伪代码如下:
wait-free-method(void* value) { // fast-path for(int i = MAX1; i--;) { before; Record* head = Head; if(head->handle == NULL) { Record* myRecord = NewValueRecord(head, value); if(CAS(&Head, head, myRecord)) { after; if((--meHandle->fastTimes) == 0) { meHandle->fastTimes = MAX2; Handle_t* handle = peekOneHandle(shareMemory); if(handle != NULL) HELP(handle); } return; } } else { // help the Record head HELP_TRANSFER(head); } } // construct myself request into shareMemory meHandle->value = value; meHandle->wrapper->isFinish= false; meHandle->wrapper->record = NULL; putMeHandleIn(shareMemory, meHandle); // slow-path HELP(meHandle); while(there is handle in shareMemory before meHandle) HELP(handle);}HELP(Handle_t* handle) { before; Record* head = Head; while(!handle->wrapper->isFinish) { // Record* head = Head; 这一行应该在测试isFinish之前 if(head->handle == NULL) { Record* myRecord = NewHandleRecord(head, handle); if(!CAS(&Head, head, myRecord)) { head = Head; continue; } head = myRecord; } // help the Record head HELP_TRANSFER(head); before; head = Head; } head = handle->wrapper->record; HELP_TRANSFER_VALUE(head, handle);}HELP_TRANSFER(Record* head) { Handle_t* handle = head->handle; finishHandle(shareMemory, handle); Wrapper* wrapper = handle->wrapper; if(!wrapper->isFinish) { //transfer to state 2. Wrapper* newWrapper = NewWrapper(head, true); CAS(&handle->wrapper, wrapper, newWrapper); } HELP_TRANSFER_VALUE(head, handle);}HELP_TRANSFER_VALUE(Record* head, Handle_t* handle) { if(Head == head) { //transfer to state 3. Record* myRecord = NewValueRecord(head, handle->value); if(CAS(&Head, head, myRecord)) after; }}
简化一下,如下伪代码:
wait-free-method(void* value) { // fast-path for(int i = MAX1; i--;) { ....... if(!there is a requestHandle in Record) { if(try myRequest) { if(successTimes is enough) { helpOneRecordInShareMemory(); } return; } } else { helpTheRecord(); } } // construct my request handle(value/Wrapper(Record*:NULL, isFinish:false)); construct myself request into shareMemory // slow-path while(my request is not finish) { if(!there is a request in Record) { if(!try myRequest) continue; assert(there is a request in Record: myRequest); } helpTheRecordFinish(); } helpTheRecordFinishValue();}
这里面的shareMemory,强烈建议改造中的队列来使用,这里需要的是个类似队列的数据结构。
从而达到对于shareMemory的操作也是无等待的,从而保证了整个算法最强的wait-freedom。