中文字幕精品亚洲无线码二区,国产黄a三级三级三级看三级,亚洲七七久久桃花影院,丰满少妇被猛烈进入,国产小视频在线观看网站

基于 IOCP 的協程(cheng)調度器——零(ling)基礎深入淺出 C++20 協程(cheng)

前言

上一篇《基于 epoll 的(de)協程調(diao)度(du)器》談到如何基于 epoll 構建一個事件驅動的協程調度器,沒有使用三方庫的原因主要是為了避免引入額外復雜度,不過只演示 Linux 未免對非 Unix 平臺的小伙伴有所不公,為此本文基于 Windows 的完成端口 (IO Completion Port:IOCP) 構建相同能力的 demo。

文(wen)章(zhang)仍然遵(zun)守之前(qian)的創作原則:

* 選(xuan)取合適的 demo 是(shi)頭等大(da)事(shi)

* 以協(xie)程為目標,涉及到的(de)新(xin)語法會簡(jian)單說明,不(bu)涉及的(de)不(bu)旁(pang)征博引

* 若語(yu)法的(de)原理非(fei)常簡單,也會(hui)簡單展開講講,有(you)利于透過現象看(kan)本質,用起來(lai)更得心應手

上一篇(pian)文(wen)章里不光引入(ru)了基于(yu)(yu)事件的調度器,還說明了如何開啟多文(wen)件并行、await_suspend 與試讀(du)的關系(xi)、singalfd 用于(yu)(yu)完美退出等(deng)話題,如果沒有(you)這些內(nei)容鋪墊,看本文時(shi)會有(you)很多地方(fang)難以理(li)解(jie),還沒看過(guo)的小伙伴,墻(qiang)裂建議先看那(nei)篇。

工(gong)具還(huan)是(shi)(shi)(shi)之(zhi)前介紹過的(de) ,這里(li)不再用到 ,主要是(shi)(shi)(shi)它不支持(chi) Windows 平臺,其(qi)實 Compiler Explorer 也只是(shi)(shi)(shi)編譯,運行(xing)的(de)話還(huan)是(shi)(shi)(shi)不太行(xing),因為它的(de)環(huan)(huan)境不支持(chi)像文件、網(wang)絡之(zhi)類的(de)異步 IO,需要用戶自行(xing)搭建(jian)開發環(huan)(huan)境。

基于完成端口的 IO 多路復用

上文(wen)中(zhong)提到(dao)了 Unix 系統中(zhong)多(duo)路復(fu)用接(jie)口的發展歷程:分別經歷了 select -> poll -> epoll/kqueue,Windows 則(ze)通過完成(cheng)端口一(yi)統江山,其實(shi)它倆(lia)調用方(fang)式差(cha)不太多(duo):

  epoll IOCP
初始化 epoll_create
CreateIoCompletionPort
關聯句柄 epoll_ctl
CreateIoCompletionPort
等待并獲取下一個事件 epoll_wait
GetQueuedCompletionStatus
投遞事件 n/a (self pipe trick) PostQueuedCompletionStatus
銷毀 close CloseHandle

而在(zai)可等(deng)待對象上,IOCP 則豐(feng)富的(de)多:

* 文件 I/O 事件??
* 文件系統變更
* 套接字(Socket)事件??
* 命名管道(Named Pipe)事件??
* 設備 I/O 事件??
* 定時器事件(結合 Waitable Timer)??

這方面能與它(ta)相提并論的恐怕只有 kqueue 了(le)。有了上面的(de)鋪墊再參考之前 epoll 的(de)實現,直接上 demo 源碼:

#include <coroutine>
#include <unordered_map>
#include <windows.h>
#include <vector>
#include <stdexcept>
#include <iostream>
#include <sstream>
#include <memory>

struct Task {
    struct promise_type {
        Task get_return_object() { return {}; }
        std::suspend_never initial_suspend() { return {}; }
        std::suspend_never final_suspend() noexcept { return {}; }
        void return_void() {}
        void unhandled_exception() { std::terminate(); }
    };
};

class IocpScheduler {
private:
    HANDLE iocp_handle;
    std::unordered_map<HANDLE, std::coroutine_handle<>> io_handles;

public:
    IocpScheduler() {
        iocp_handle = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
        if (iocp_handle == NULL) {
            throw std::runtime_error("CreateIoCompletionPort failed");
        }
    }

    ~IocpScheduler() {
        CloseHandle(iocp_handle);
    }

    void register_io(HANDLE file_handle, std::coroutine_handle<> handle) {
        if (io_handles.find(file_handle) == io_handles.end()) {
            io_handles[file_handle] = handle;

            if (CreateIoCompletionPort(file_handle, iocp_handle, (ULONG_PTR)file_handle, 0) == NULL) {
                throw std::runtime_error("CreateIoCompletionPort failed to associate file handle");
            }
        }
    }

    void run() {
        while (true) {
            DWORD bytes_transferred = 0;
            ULONG_PTR completion_key = 0;
            LPOVERLAPPED overlapped = nullptr;

            BOOL success = GetQueuedCompletionStatus(
                iocp_handle,
                &bytes_transferred,
                &completion_key,
                &overlapped,
                INFINITE);

            if (completion_key != 0) {
                HANDLE ready_handle = (HANDLE)completion_key;
                if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {
                    it->second.resume();
                }
            }
        }
    }
};

struct AsyncReadAwaiter {
    IocpScheduler& sched;
    HANDLE file_handle;
    std::unique_ptr<char[]> buffer;
    DWORD buffer_size;
    OVERLAPPED overlapped;
    DWORD bytes_read;

    AsyncReadAwaiter(IocpScheduler& s, HANDLE file, DWORD size)
        : sched(s), file_handle(file), buffer_size(size), bytes_read(0) {
        buffer = std::make_unique<char[]>(size);
        ZeroMemory(&overlapped, sizeof(OVERLAPPED));
    }

    bool await_ready() const {
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) {
        sched.register_io(file_handle, h);
        
        if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
            DWORD error = GetLastError();
            if (error != ERROR_IO_PENDING) {
                std::stringstream ss;
                ss << "ReadFile failed, error " << error;
                throw std::runtime_error(ss.str());
            }
        }
    }

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();
            std::stringstream ss;
            ss << "GetOverlappedResult failed, error " << error;
            throw std::runtime_error(ss.str());
        }

        return std::string(buffer.get(), bytes_transferred);
    }
};

Task async_read_file(IocpScheduler& sched, const char* path) {
    HANDLE file_handle = CreateFileA(
        path,
        GENERIC_READ,
        FILE_SHARE_READ,
        NULL,
        OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED,
        NULL);

    if (file_handle == INVALID_HANDLE_VALUE) {
        std::stringstream ss;
        ss << "CreateFile failed, error " << GetLastError();
        throw std::runtime_error(ss.str());
    }

    while (true) {
        auto data = co_await AsyncReadAwaiter(sched, file_handle, 4096);
        std::cout << "Read " << data.size() << " bytes\n";
        if (data.size() == 0) {
            break;
        }
    }

    CloseHandle(file_handle);
}

int main(int argc, char* argv[]) {
    if (argc < 2) {
        std::cout << "Usage: sample file_path" << std::endl;
        return 1;
    }

    IocpScheduler scheduler;
    async_read_file(scheduler, argv[1]);
    scheduler.run();
    return 0;
}

先看編譯:

image

Compile Explorer 中(zhong)(zhong)指定最新的 msvc 編譯(yi)器(qi)和 C++20 選(xuan)項可(ke)以編譯(yi)通(tong)過,注意在 Windows 中(zhong)(zhong)選(xuan)項指定的語法與 Unix 大(da)相徑(jing)庭(ting),別弄錯了。

一點一點降低版本(ben)嘗試(shi),發(fa)現能編譯這段(duan)代碼的最(zui)低版本(ben)是(shi) msvc19.29,對應 vs16.11,如果(guo)你(ni)需(xu)要(yao)在本(ben)地(di)安(an)裝測(ce)試(shi)環境(jing)的話(hua),穩(wen)妥起見安(an)裝 msvc19.30、對應 vs17.0 也就(jiu)是(shi)  VS2022 比較好,如果(guo)本(ben)地(di)只有 VS2019,需(xu)要(yao)升級(ji)到第五個也就(jiu)是(shi)最(zui)后一個發(fa)行版才(cai)可以。

image

接下來創建一個簡(jian)單的控(kong)制臺(tai)應用包含上(shang)面的源文件(jian),需(xu)要配(pei)置一下 C++ 語(yu)言標準:

image

就可以編譯生成可執(zhi)行文件了(le),在(zai)同目(mu)錄(lu)準備(bei)一個(ge)文本文件 (test.txt) 進行測試:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
...
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 4096 bytes
Read 409
PS D:\code\iocp_coroutine\Debug>

居然死循(xun)環了。

指定偏移量

同樣(yang)的(de)代碼邏輯(ji),Unix 上沒(mei)問(wen)題 Windows 上卻(que)死循(xun)環,主要(yao)原因是(shi):前(qian)(qian)者(zhe)底層使用(yong)的(de)是(shi)管道,與 socket 之類(lei)相似(si)是(shi)一個流 (stream),因此沒(mei)有(you)讀寫偏(pian)移(yi)量的(de)說法,每(mei)次從(cong)開頭獲取(qu)就可以了;后(hou)者(zhe)使用(yong)的(de)是(shi)文件(jian)(jian),如果不(bu)指定(ding)偏(pian)移(yi)量,每(mei)次都會從(cong)位(wei)置 0 讀取(qu),有(you)的(de)讀者(zhe)可能問(wen)了,為(wei)何不(bu)能使用(yong)當前(qian)(qian)文件(jian)(jian)的(de)讀取(qu)位(wei)置呢?這是(shi)因為(wei) Windows 上的(de)多(duo)路復用(yong)底層是(shi)徹(che)徹(che)底底的(de)異步(bu)架構,必(bi)需每(mei)次為(wei) ReadFile 指定(ding)一個偏(pian)移(yi)量,而不(bu)能夠使用(yong)當前(qian)(qian)文件(jian)(jian)的(de)偏(pian)移(yi)量。

修(xiu)復的(de)(de)方(fang)法很簡單(dan),為(wei) ReadFile 的(de)(de) overlapped 參數(shu)的(de)(de) Offset & OffsetHigh 字段指定要讀取數(shu)據(ju)的(de)(de)偏移量即可:

...
    struct AsyncReadAwaiter {
    IocpScheduler& sched;
    HANDLE file_handle;
    std::unique_ptr<char[]> buffer;
    DWORD buffer_size;

增加(jia)一個引用成員用來記錄當前請求的偏(pian)移值,LARGE_INTEGER 可(ke)以理解為(wei) uint64 的結(jie)構化表(biao)達

    LARGE_INTEGER &offset; 
    OVERLAPPED overlapped;
    DWORD bytes_read;

    AsyncReadAwaiter(IocpScheduler& s, HANDLE file, LARGE_INTEGER &off, DWORD size)

在構造(zao)函數(shu)中初(chu)始化新成員,這個值(zhi)需要從外部(bu)傳(chuan)入(ru),讀(du)取成功后更新之,以便跨等待對象使用

        : sched(s), file_handle(file), buffer_size(size), offset(off), bytes_read(0) {
        buffer = std::make_unique<char[]>(size);
        ZeroMemory(&overlapped, sizeof(OVERLAPPED));
    }

    bool await_ready() const {
        return false;
    }

    void await_suspend(std::coroutine_handle<> h) {
        sched.register_io(file_handle, h);

每次(ci)請(qing)求前設置 overlapped 的偏移(yi)字(zi)段,并(bing)增加調試日志輸(shu)出以便觀察(cha)

        overlapped.Offset = offset.LowPart; 
        overlapped.OffsetHigh = offset.HighPart; 
        std::cout << "ReadFile from " << offset.QuadPart << std::endl;
        if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
            DWORD error = GetLastError();
            if (error != ERROR_IO_PENDING) {
                std::stringstream ss;
                ss << "ReadFile failed, error " << error;
                throw std::runtime_error(ss.str());
            }
        }
    }

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();
            std::stringstream ss;
            ss << "GetOverlappedResult failed, error " << error;
            throw std::runtime_error(ss.str());
        }

讀取成功后,遞增相(xiang)應的偏移量(liang)

        offset.QuadPart += bytes_transferred; 
        return std::string(buffer.get(), bytes_transferred);
    }
};

Task async_read_file(IocpScheduler& sched, const char* path) {
    HANDLE file_handle = CreateFileA(
        path,
        GENERIC_READ,
        FILE_SHARE_READ,
        NULL,
        OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED,
        NULL);

    if (file_handle == INVALID_HANDLE_VALUE) {
        std::stringstream ss;
        ss << "CreateFile failed, error " << GetLastError();
        throw std::runtime_error(ss.str());
    }

在外層循環中保存這個偏移(yi)量,以便可以持久化使用,初始值為(wei) 0

    LARGE_INTEGER offset = { 0 }; 
    while (true) {
        auto data = co_await AsyncReadAwaiter(sched, file_handle, offset, 4096);
        std::cout << "Read " << data.size() << " bytes\n";
        if (data.size() == 0) {
            break;
        }
    }

    CloseHandle(file_handle);
}
...

再次(ci)運行程序,可(ke)以輸出讀取的內容了:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552

但(dan)是額外的(de),也收到(dao)了一(yi)個(ge)崩潰提示:

image

處理文件 EOF

記(ji)得之前講到協程體整個是(shi)(shi)包在(zai)編譯的 try...catch 代碼塊中(zhong)的,這(zhe)里直接崩(beng)潰(kui)難道是(shi)(shi) msvc 的異常處(chu)理沒(mei)起作(zuo)用?掛上調試(shi)器(qi)看看崩(beng)潰(kui)堆棧:

image

看起(qi)來是命(ming)中 promise 對象的(de) unhandle_exception,這里調用的(de) terminate 導致崩(beng)潰(kui)框(kuang)彈出,而 unhandled_exception 恰恰是編譯器捕(bu)獲(huo)了 throw 拋(pao)出的(de)異常,與直覺剛好相反。經(jing)過(guo)排查,唯一(yi)可能拋(pao)出異常的(de)位置是這里(li):

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();
            std::stringstream ss;
            ss << "GetOverlappedResult failed, error " << error;

這里加打一行日志

            std::cerr << ss.str() << std::endl;
            throw std::runtime_error(ss.str());
        }
        
        offset.QuadPart += bytes_transferred; 
        return std::string(buffer.get(), bytes_transferred);
    }

新(xin)的(de)輸(shu)出果然提(ti)示(shi)這里返回了錯誤(wu):

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
GetOverlappedResult failed, error 38

錯誤碼 38 對應的是 ERROR_HANDLE_EOF表示文件已到末尾,相比 epoll 管道不關心數據結尾的問題,IOCP 讀文件還需要額外增加一些處理,另外在拋異常時,msvc 相比 clang 的顯示不太友好,需要在拋出異常前補上 stderr 的打印,修復后的代碼如下:

    std::string await_resume() {
        DWORD bytes_transferred = 0;
        if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
            DWORD error = GetLastError();

判斷(duan)(duan)錯誤類型,如果是文件 EOF,直接(jie)返回空數據,上層會(hui)進(jin)行判斷(duan)(duan),從而退出讀(du)取(qu)循環

            if (error != ERROR_HANDLE_EOF) {
                std::stringstream ss;
                ss << "GetOverlappedResult failed, error " << error;
                std::cerr << ss.str() << std::endl;
                throw std::runtime_error(ss.str());
            }
            else {
                return ""; 
            }
        }

        offset.QuadPart += bytes_transferred; 
        return std::string(buffer.get(), bytes_transferred);
    }

下面是新(xin)的輸出(chu):

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
ReadFile from 0
Read 4096 bytes
ReadFile from 4096
Read 456 bytes
ReadFile from 4552
Read 0 bytes

不(bu)再報錯了。

多文件并行

上(shang)面的例子雖(sui)然通過多(duo)次讀取展示(shi)了協程多(duo)次喚(huan)醒的過程,但沒有展示(shi)多(duo)個(ge) IO 句(ju)柄并發(fa)的能力,下面稍(shao)加改造,同時讀取多(duo)個(ge)文件:

Task async_read_file(IocpScheduler& sched, const char* path) {
    HANDLE file_handle = CreateFileA(
        path,
        GENERIC_READ,
        FILE_SHARE_READ,
        NULL,
        OPEN_EXISTING,
        FILE_FLAG_OVERLAPPED,
        NULL);

    if (file_handle == INVALID_HANDLE_VALUE) {
        std::stringstream ss;
        ss << "CreateFile failed, error " << GetLastError();
        std::cerr << ss.str() << std::endl; 
        throw std::runtime_error(ss.str());
    }

    LARGE_INTEGER offset = { 0 };
    while (true) {
        auto data = co_await AsyncReadAwaiter(sched, file_handle, offset, 4096);

輸(shu)出文件句柄以區(qu)別(bie)從(cong)不同文件讀取的數據

        std::cout << "Read [" << file_handle << "] " << data.size() << " bytes\n";
        if (data.size() == 0) {
            break;
        }
    }

    CloseHandle(file_handle);
}

int main(int argc, char* argv[]) {
    if (argc < 3) {
        std::cout << "Usage: sample file1 file2" << std::endl;
        return 1;
    }

    IocpScheduler scheduler;
    async_read_file(scheduler, argv[1]);

多個文件只(zhi)需要多次調用協程(cheng)即(ji)可,從這里可以感受到協程(cheng)強大的(de)拓(tuo)展性

    async_read_file(scheduler, argv[2]);
    scheduler.run();
    return 0;
}

下面是新的(de)輸出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt test2.txt
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 1024 bytes
Read [0000010C] 1024 bytes
Read [00000108] 456 bytes
Read [0000010C] 456 bytes
Read [00000108] 0 bytes
Read [0000010C] 0 bytes

為(wei)了便于(yu)對(dui)比,這里(li)將讀取 buffer 的默認尺寸改為(wei) 1024,并去掉了調試日(ri)志。可以(yi)看出(chu)在(zai) IOCP 里(li)兩個文件基本是(shi)輪著讀的,公平性還是(shi)不錯(cuo)的。

await_suspend & 試讀

上(shang)文(wen)中(zhong)提(ti)到,通過(guo)在 await_ready 或 await_suspend 中(zhong)增加(jia)一些代碼,就(jiu)可以支(zhi)持數據試讀,從(cong)而在某些場景下提(ti)升數據吞吐能力。下面看看 IOCP 是如(ru)何實(shi)現的(de),這(zhe)里(li)只演示(shi) await_suspend 方式(shi):

    bool await_suspend(std::coroutine_handle<> h) {
        sched.register_io(file_handle, h);

        overlapped.Offset = offset.LowPart;
        overlapped.OffsetHigh = offset.HighPart;
        //std::cout << "ReadFile from " << offset.QuadPart << std::endl;
        if (!ReadFile(file_handle, buffer.get(), buffer_size, &bytes_read, &overlapped)) {
            DWORD error = GetLastError();
            if (error != ERROR_IO_PENDING) {
                std::stringstream ss;
                ss << "ReadFile failed, error " << error;
                std::cerr << ss.str() << std::endl; 
                throw std::runtime_error(ss.str());
            }
        }

ReadFile 本身(shen)具有試讀能力(li),當任務可以立即完成(cheng)時,它將(jiang)返回(hui) TRUE,bytes_read 參數將(jiang)返回(hui)讀取的數據長度;這(zhe)里加入判斷,若立即讀取成(cheng)功,則(ze)返回(hui) false 不掛起協程(cheng)

        else {
            // if immediately success, not hangup
            std::cout << "immediately success, read = " << bytes_read << std::endl; 
        }
        return bytes_read > 0 ? false : true;
    }

    std::string await_resume() {
        DWORD bytes_transferred = 0;

resume 時先判斷是否(fou)為試讀場景,是的話直接返回數據就可以了(le)

        if (bytes_read > 0) {
            bytes_transferred = bytes_read;
        }
        else {
            if (!GetOverlappedResult(file_handle, &overlapped, &bytes_transferred, FALSE)) {
                DWORD error = GetLastError();
                if (error != ERROR_HANDLE_EOF) {
                    std::stringstream ss;
                    ss << "GetOverlappedResult failed, error " << error;
                    std::cerr << ss.str() << std::endl;
                    throw std::runtime_error(ss.str());
                }
                else {
                    return "";
                }
            }
        }

        offset.QuadPart += bytes_transferred; 
        return std::string(buffer.get(), bytes_transferred);
    }

從這(zhe)里也可以(yi)看出,Windows 對(dui)直接(jie)成功的支持是比較好的,不必像 Unix 那樣來回倒騰數據,下面是新版本(ben)輸出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 1024 bytes
Read [000000FC] 456 bytes
Read [000000FC] 0 bytes

運行了(le)多(duo)次沒有試出來,可能 Windows 只對網絡(luo)等真正異步的(de)場景才會有立即成功(gong)的(de)情況(kuang)吧。

PostQueuedCompletionStatus & 完美退出

上面的(de) demo 如果(guo)遇到大(da)文件目前只能通過 Ctrl C 強制(zhi)殺死,畢(bi)竟調度器(qi)的(de) run 是個死循環沒法退出(chu),下面對它進行一番改造,看看能否實現完美退出(chu)

IocpScheduler g_scheduler;

由于需要在(zai)信(xin)號響應函數(shu)中通知調度器(qi)退出,這里將(jiang)它做為一個全(quan)局變量,工程化一點的話可以(yi)改成單(dan)例(li),這里偷個懶

int main(int argc, char* argv[]) {
    if (argc < 2) {
        std::cout << "Usage: sample file" << std::endl;
        return 1;
    }

初始(shi)化(hua)時捕獲 SiGINT 以(yi)便(bian)響應(ying) Ctrl C

    signal(SIGINT, on_user_exit); 
    async_read_file(g_scheduler, argv[1]);
    g_scheduler.run();
    return 0;
}

在信號響應函數中調(diao)用調(diao)度(du)器退出(chu)接口實現完美退出(chu)

void on_user_exit(int signo) {
    g_scheduler.exit(signo); 
}

class IocpScheduler {
    ...

調度器中(zhong)新增的(de)退(tui)出接口(kou),無腦給(gei) IOCP 隊(dui)列投遞(di)通知,注意 key 參數給(gei)的(de)是(shi) 0,以區(qu)別于(yu)一般(ban)的(de)文件(jian)讀取事件(jian)

    void exit(int signo) {
        std::cout << "caught signal " << signo << ", prepare to quit!" << std::endl; 
        PostQueuedCompletionStatus(iocp_handle, 0, (ULONG_PTR)0, NULL);
    }
    
    void run() {
        while (true) {
            DWORD bytes_transferred = 0;
            ULONG_PTR completion_key = 0;
            LPOVERLAPPED overlapped = nullptr;

            BOOL success = GetQueuedCompletionStatus(
                iocp_handle,
                &bytes_transferred,
                &completion_key,
                &overlapped,
                INFINITE);

收到事(shi)件后,優先檢測是否(fou)為退(tui)出(chu)事(shi)件,命(ming)中的(de)話直接 break while 循環退(tui)出(chu) main

            if (completion_key == 0) {
                std::cout << "IOCP ready to quit" << std::endl; 
                break; 
            }
            else {
                HANDLE ready_handle = (HANDLE)completion_key;
                if (auto it = io_handles.find(ready_handle); it != io_handles.end()) {
                    it->second.resume();
                }
            }
        }
    }

    ~IocpScheduler() {

調度器析構中(zhong)增加協(xie)程(cheng)的銷毀,防止內(nei)存、句柄泄漏

        for(auto handle : io_handles) {
            std::cout << "coroutine destroy" << std::endl;
            handle.second.destroy();
        }
        CloseHandle(iocp_handle);
    }
};

下面是新版輸出:

PS D:\code\iocp_coroutine\Debug> .\iocp_coroutine.exe test.txt
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 1024 bytes
Read [00000110] 456 bytes
Read [00000110] 0 bytes
caught signal 2, prepare to quit!
IOCP ready to quit
coroutine destroy

用戶按下 Ctrl C 后,可以(yi)實現(xian)完美(mei)退出啦! 

結語

本文介紹了(le)一種基于 IOCP 多路復用的協程(cheng)調(diao)度器,除此之外(wai)還(huan)說(shuo)明了(le)如何妥善處理文件(jian)偏(pian)移、文件(jian) EOF、await_suspend 與試讀寫、PostQueuedCompletionStatus 與進程(cheng)完美(mei)退出等,可(ke)用于構(gou)建工(gong)業級強度的代(dai)碼。

最(zui)后,由于(yu)本文(wen)中 demo 經歷多次迭(die)代,想要(yao)復(fu)制最(zui)終(zhong)版進行驗證的小伙伴(ban),可以 follow 這個開源 git 庫獲取(qu):。

下一(yi)篇(pian)來看下,如(ru)何將現有的(de)基于回調的(de)異步庫與 C++20 協程(cheng)無縫糅合(he)。

參考 

[1]. 

[2]. 

posted @ 2025-09-22 11:34  goodcitizen  閱讀(1568)  評論(1)    收藏  舉報