博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
A Fast Wait-Free Simulation for Lock-Free Data Structures
阅读量:6039 次
发布时间:2019-06-20

本文共 5319 字,大约阅读时间需要 17 分钟。

基于CAS(compare-and-swap)操作的无锁算法,几乎都可以采用《》来转化为无等待算法,从而获得更强的progress。

相对于《》中的算法所采用的类似无等待队列,本文换成一种基于getAndAdd+CAS的无等待队列。
本文我用自己的方式来介绍下:

  1. 如何从一个基于CAS的正确(一致性)的无锁算法转化为无等待算法,针对CAS操作建模无等待算法
  2. 同时基于环形的线程专属数据环,由于依赖于具体的数据偏移,降低了公平性。所以使用一种更快速的类似wait-free队列,从而获得更好的公平性。

几乎所有的无锁算法(先不考虑垃圾回收),对于一个线程而言,都是一个如下的过程:

要么成功(本操作make progress)、要么失败(其他线程的操作make progress)从而本操作需要更新数据(preCasOp),再次重复直到成功。并且最后要执行必要的postCasOp。

那么,对于一个value,我们将它置于一个record里:

struct Record {    void* value;};struct Record* Head;

算法的关键在于,对一个head的原子改变:

lock-free-method(void* value) {    for(;;) {        before;        Record* head = Head;        Record* myRecord = Generate(head, value);        if(CAS(&Head, head, myRecord)) {            after;            return;        }    }}

这里的问题在于,对于一个操作lock-free-method(val),它在这一行CAS(&Head, head, myRecord),我们需要在这一行取得成功才能算本操作make progress。但是我们无法确定什么时候能够取得成功。在操作系统内核的调度算法越来越好的情况下,我们很可能绝大多数情况下不会感受到某次操作的延迟。但是有没有办法能够确保,本操作能够在有限步之后完成。这种算法性质就叫做wait-freedom。

我们转化成的wait-free算法具有如下结构:

  • 每个线程都先在fast-path中尝试MAX-fast次,假如某次成功则返回,假如成功的次数达到ENOUTH-fast(per thread),那么就通过shareMemory协助处于slow-path中的操作一次。
  • 假如fast-path中尝试MAX-fast次仍未成功,则构造自己的请求信息放到shareMemroy,接着进入slow-path。
  • 进入slow-path的线程,不断地尝试,最终会在有限次尝试内,自己完成操作,或者由其他线程帮助完成。
  • 协助过程,可以抽象出取得一致性的三个接口:

    HELP(handle) HELP_TRANSFER(head) HELP_TRANSFER_VALUE(head, handle)

对于如上这个具体操作,我们可以做如下操作:

首先,给Record结构增加一个Handle_t数据类型:

typedef struct Record {    void* value;    Handle_t* handle;} Record;typedef struct Handle_t {    void* value;    struct Wrapper* wrapper;} Handle_t;typedef struct Wrapper {    bool isFinish;    Record* record;} Wrapper;

我们把lock-free-method进行改造:

  • 原来的lock-free-method作为fast-path,给定两个阈值,MAX1:让它执行够次数就退出。构造原来的请求,放入共享内存中(可以是并发队列)。 MAX2: 在fast-path中执行成功的次数,从而主动help slow-path中的请求
  • CAS操作不再像原来一样在在value的值上改变,而是根据record->handle是否为空来继续操作:为空则尝试自己的请求,不为空则尝试帮助handle对应的请求make progress。(同理,fast-path也进行修改。)
  • handle中会有个指向Record的指针,以及代表本操作是否完成的bit和version。

所以,slow-path通过引入两个中间状态(states),把本来一个CAS操作变为了三个CAS操作

  • STEP 1: (exclusive)线程A尝试把原来的Record,指向请求所对应的Handle,make progress(state 1,该Record被某个线程锚定).
  • STEP 2: (consisent)某个线程B(B为任意线程)从Record中取出Handle,并且将该Handle指向这个Record,同时设置finish bit,从而告知该次请求完成,make progress.(state 2,说明此事操作已经完成)
  • STEP 3: 线程A从自己的handle中获知操作已完成(finish bit,stete 2导致的),并且获取自己锚定的Record,更新Record的值,从而完成收尾工作,并且返回自己的操作结果(state 3,此时Record的值又回到了原来的状态,与lock-free中的结果一致)

伪代码如下:

wait-free-method(void* value) {    // fast-path    for(int i = MAX1; i--;) {        before;        Record* head = Head;        if(head->handle == NULL) {            Record* myRecord = NewValueRecord(head, value);            if(CAS(&Head, head, myRecord)) {                after;                if((--meHandle->fastTimes) == 0) {                    meHandle->fastTimes = MAX2;                    Handle_t* handle = peekOneHandle(shareMemory);                    if(handle != NULL)                        HELP(handle);                }                return;            }        } else {            // help the Record head            HELP_TRANSFER(head);        }    }    // construct myself request into shareMemory    meHandle->value = value;    meHandle->wrapper->isFinish= false;    meHandle->wrapper->record = NULL;    putMeHandleIn(shareMemory, meHandle);    // slow-path    HELP(meHandle);    while(there is handle in shareMemory before meHandle)        HELP(handle);}HELP(Handle_t* handle) {    before;    Record* head = Head;    while(!handle->wrapper->isFinish) {        // Record* head = Head; 这一行应该在测试isFinish之前        if(head->handle == NULL) {            Record* myRecord = NewHandleRecord(head, handle);            if(!CAS(&Head, head, myRecord)) {                head = Head;                continue;            }            head = myRecord;        }        // help the Record head        HELP_TRANSFER(head);        before;        head = Head;    }    head = handle->wrapper->record;    HELP_TRANSFER_VALUE(head, handle);}HELP_TRANSFER(Record* head) {    Handle_t* handle = head->handle;    finishHandle(shareMemory, handle);    Wrapper* wrapper = handle->wrapper;    if(!wrapper->isFinish) {        //transfer to state 2.        Wrapper* newWrapper = NewWrapper(head, true);        CAS(&handle->wrapper, wrapper, newWrapper);    }    HELP_TRANSFER_VALUE(head, handle);}HELP_TRANSFER_VALUE(Record* head, Handle_t* handle) {    if(Head == head) {        //transfer to state 3.        Record* myRecord = NewValueRecord(head, handle->value);        if(CAS(&Head, head, myRecord))            after;    }}

简化一下,如下伪代码:

wait-free-method(void* value) {    // fast-path    for(int i = MAX1; i--;) {        .......        if(!there is a requestHandle in Record) {            if(try myRequest) {                if(successTimes is enough) {                    helpOneRecordInShareMemory();                }                return;            }        } else {            helpTheRecord();        }    }    // construct my request handle(value/Wrapper(Record*:NULL, isFinish:false));    construct myself request into shareMemory    // slow-path    while(my request is not finish) {        if(!there is a request in Record) {            if(!try myRequest)                 continue;            assert(there is a request in Record: myRequest);        }        helpTheRecordFinish();    }    helpTheRecordFinishValue();}

这里面的shareMemory,强烈建议改造中的队列来使用,这里需要的是个类似队列的数据结构。

从而达到对于shareMemory的操作也是无等待的,从而保证了整个算法最强的wait-freedom。

转载地址:http://eplhx.baihongyu.com/

你可能感兴趣的文章
C++ 排序函数 sort(),qsort()的使用方法
查看>>
OC语言Block和协议
查看>>
使用xpath时出现noDefClass的错误(找不到某个类)
查看>>
OutputCache祥解
查看>>
【推荐】最新国外免费空间网站Hostinger
查看>>
.Net规则引擎介绍 - REngine
查看>>
微信消息回复C#
查看>>
JVM学习03_new对象的内存图讲解,以及引出static方法(转)
查看>>
I深搜
查看>>
c++面向对象的编程
查看>>
ArcMap概化之消除真曲线
查看>>
[禅悟人生]谦虚有助于自我消融
查看>>
MFC之自绘控件
查看>>
算法提高 道路和航路 SPFA 算法
查看>>
Golang 如何从socket读出所有数据
查看>>
iOS开发使用半透明模糊效果方法整理
查看>>
一道图论小题目
查看>>
Hibernate拦截器(Interceptor)与事件监听器(Listener)
查看>>
关于android im
查看>>
CSS3 transforms 3D翻开
查看>>