其他分享
首页 > 其他分享> > 多线程记录文件

多线程记录文件

作者:互联网

一个生产者多消费者的应用 , 代码里目前就先简单用了单生成单消费,

每一局麻将都需要记录:每一次的发牌,抓牌,出牌 ,吃碰杠胡听;

每一局麻将生成一个key  , 可随意自己组合

 

下面代码思路仅供参考, 还未测试过; 可修改成多线程日志

主线程唯一需要在开始的时候调用一次 start ,  记录用push;

4种模式可选, 一种不关闭文件handle, 直到一局游戏结束,再关闭

一种每次打开,写入,关闭;

BIN 还没写

DB 还没写

class ErrorInfo{
    static const SIZE_T ERR_MSG_SIZE = 1 << 13;
    static CRITICAL_SECTION err_cs;
    static std::once_flag oneflag;
public:
    static void init(){
        std::call_once(ErrorInfo::oneflag,&ErrorInfo::init_cs);
    }
    static void init_cs(){
           InitializeCriticalSectionAndSpinCount(&ErrorInfo::err_cs, 4000);
    }

    static std::string getError(DWORD err){
        EnterCriticalSection(&err_cs);
        static HANDLE g_heap = HeapCreate(0, ERR_MSG_SIZE, 0);
        static char *buf = (char*)HeapAlloc(g_heap, 0, ERR_MSG_SIZE);
        static std::string errstr;
        static std::stringstream ss;
        ss.str("");
        ss.clear();
        errstr.clear();
        DWORD syslocale = MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT);
        DWORD ret = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
            NULL, err, syslocale, buf, ERR_MSG_SIZE, NULL);

        if (!ret){
            static HMODULE hDll = LoadLibraryEx(TEXT("netmsg.dll"), NULL, DONT_RESOLVE_DLL_REFERENCES);
            if (hDll){
                //如果在dll中查找,FORMAT_MESSAGE_FROM_HMODULE 添加上去, 第2个参数填写句柄
                ret = FormatMessageA(FORMAT_MESSAGE_FROM_HMODULE | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
                    hDll, err, syslocale, buf, ERR_MSG_SIZE, NULL);
            }
        }
        if (ret && buf){
            buf[ret] = 0;
            errstr = buf;
            LeaveCriticalSection(&err_cs);
            return errstr;
        }
        ss << err;
        ss >> errstr;
        LeaveCriticalSection(&err_cs);
        return errstr;
    }
};




struct  FileHandleData{
    HANDLE hFile;
    volatile bool inUse;
    std::string filepath;
    FileHandleData(HANDLE file = INVALID_HANDLE_VALUE, bool inUse = false):hFile(file),inUse(inUse){}
};
struct  storeData{
    std::string key;
    std::string data;
    storeData(const char * key, const char *data):key(key),data(data){}
    storeData(storeData && obj) throw() : key(std::move(obj.key)),data(std::move(obj.data)){}
    storeData & operator=(storeData && obj) throw(){
        if(this != &obj){
            key = std::move(obj.key);
            data = std::move(obj.data);
        }
        return *this;
    }
};

class RecordFile
{
    CRITICAL_SECTION write_cs;
    CRITICAL_SECTION prepare_cs;
    CRITICAL_SECTION errlog_cs;
    CONDITION_VARIABLE write_cond;
    HANDLE hSem;
    deque<storeData> prepare_queue;
    deque<std::string> wait_for_close_queue;
    deque<storeData> write_queue;
    std::unordered_map<std::string,FileHandleData> write_map;
    HANDLE hThreadProducer;
    unsigned tid_producer;
    HANDLE hThreadConsumers[4];
    unsigned tid_consumers[4];
    bool bStopPrepare;
    bool bStopProcess;
    bool bAllDone;
    bool bStarted;
    int m_operate;
    std::string cur_dir_name;
    HANDLE hErrorLog;
    std::string errorLogPath;
public:
    /*
        OPENFILE 只创建一次文件HANDLE, 不关闭
        CLOSEFILE 每次写完数据就关闭文件句柄
        BIN 和 DB 还没写
    */
    enum eOperate { OPENFILE = 1, CLOSEFILE = 2, BIN = 3 , DB = 4};
    enum eSpinCount { SpinCountMin = 1000, SpinCountNormal = 2000, SpinCountMax = 4000};
    enum eSemaphoreCount{ SemMin = 200 , SemNormal = 1000 , SemMax = 2000};
    RecordFile(int operate =CLOSEFILE):bAllDone(false),
        bStarted(false),
        bStopProcess(false),
        bStopPrepare(false),
        m_operate(operate) {
        cur_dir_name = std::string(typeid(*this).name()).substr(6);
        errorLogPath = cur_dir_name + "/errorlog.log";
        ErrorInfo::init();
    }
    void push(const char * key, const char * data){
        EnterCriticalSection(&prepare_cs);
        prepare_queue.emplace_back(key,data);
        LeaveCriticalSection(&prepare_cs);
        ReleaseSemaphore(&hSem,1,0);
    }
    void start(){
        bStarted = true;
        bAllDone = false;
        bStopProcess = false;
        bStopPrepare = false;
        hThreadProducer = NULL;
        memset(hThreadConsumers,0,sizeof(hThreadConsumers));
        InitializeCriticalSectionAndSpinCount(&write_cs , SpinCountMax);
        InitializeCriticalSectionAndSpinCount(&prepare_cs , SpinCountMax);
        InitializeCriticalSectionAndSpinCount(&errlog_cs , SpinCountMax);
        InitializeConditionVariable(&write_cond);
        hSem = CreateSemaphoreW(0,0,SemMax,NULL);
        if(!RecordFile::pathExists(cur_dir_name.c_str())){
            CreateDirectoryA(cur_dir_name.c_str(),NULL);
        }
        hErrorLog = CreateFileA(errorLogPath.c_str(),GENERIC_WRITE|GENERIC_READ,FILE_SHARE_READ ,
                                NULL,OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL,NULL);
        if(hErrorLog != INVALID_HANDLE_VALUE){
            setAppendFileHandle(hErrorLog);
        }
        if(OPENFILE == m_operate || CLOSEFILE  == m_operate || BIN == m_operate ){
            hThreadProducer = (HANDLE)_beginthreadex(0,0,&RecordFile::mem_func_prepare,(void*)this,0,&tid_producer);
            hThreadConsumers[0] = (HANDLE)_beginthreadex(0,0,&RecordFile::mem_func_process ,(void*)this,0,&tid_consumers[0]);
        }
    }
    virtual ~RecordFile(){

    }
    void log_error(const std::string & str){
        if(hErrorLog == INVALID_HANDLE_VALUE){
            return;
        }
        EnterCriticalSection(&errlog_cs);
        WriteFile(hErrorLog,str.c_str(),str.size(),0,0);
        FlushFileBuffers(hErrorLog);
        LeaveCriticalSection(&errlog_cs);
    }
    static bool pathExists(const char * path){
        WIN32_FILE_ATTRIBUTE_DATA attr = { 0 };
        return 0 != GetFileAttributesExA(path, GetFileExInfoStandard, &attr);
    }
    void stop(){
        if(bAllDone || !bStarted)
            return;
        bAllDone = true;
        bStarted = false;
        EnterCriticalSection(&prepare_cs);
        bStopPrepare = true;
        LeaveCriticalSection(&prepare_cs);
        ReleaseSemaphore(&hSem,1,0);
        EnterCriticalSection(&write_cs);
        bStopProcess = true;
        LeaveCriticalSection(&write_cs);
        WakeAllConditionVariable(&write_cond);
        if(hThreadProducer){
            WaitForSingleObject(hThreadProducer,INFINITE);
            CloseHandle(hThreadProducer);
            hThreadProducer = 0;
        }
        if(hThreadConsumers[0]){
            WaitForSingleObject(hThreadConsumers[0] , INFINITE);
            CloseHandle(hThreadConsumers[0]);
            hThreadConsumers[0] = 0;
        }

        CloseHandle(hSem);
        hSem = NULL;
        DeleteCriticalSection(&write_cs);
        DeleteCriticalSection(&prepare_cs);
        DeleteCriticalSection(&errlog_cs);
    }
    bool setAppendFileHandle(HANDLE fileHandle){
        static LARGE_INTEGER pos = {0};
        if(fileHandle != INVALID_HANDLE_VALUE){
            return SetFilePointerEx(fileHandle,pos,0,FILE_END);
        }
        return false;
    }
    static unsigned int mem_func_prepare(void * arg){
        RecordFile * p = (RecordFile * )arg;
        if( OPENFILE == p->m_operate)
            return p->__prepareOpenFileMode__();
        else if(CLOSEFILE == p->m_operate)
            return p->__prepareCloseFileEachTime__();
    }
    static unsigned int mem_func_process(void * arg){
        RecordFile * p = (RecordFile * )arg;
        if(OPENFILE == p->m_operate)
            return p->__processOpenFileMode__();
        else if(CLOSEFILE == p->m_operate)
            return p->__processCloseFileEachTime__();
    }
    std::string getFileName(const char * key){
        char filepath[MAX_PATH] = {0};
        SYSTEMTIME lt = {0};
        GetLocalTime(&lt);
        int len = cur_dir_name.size();
        memcpy(filepath,cur_dir_name.c_str(),len* sizeof(char));
        sprintf_s(filepath+len,MAX_PATH - len,"/%04d.%02d.%02d %02d.%02d.%02d.%04d-%s.txt",
                  lt.wYear,lt.wMonth,lt.wDay,lt.wHour,lt.wMinute,lt.wSecond,lt.wMilliseconds,
                  key);
        return filepath;
    }
    virtual unsigned int __prepareCloseFileEachTime__(){
        std::string filepath;
        bool bInsert_ok = false;
        std::string errmsg;
        while(true){
            bInsert_ok = false;
            WaitForSingleObject(hSem,INFINITE);
            EnterCriticalSection(&prepare_cs);
            storeData data = std::move(prepare_queue.front());
            prepare_queue.pop_front();
            LeaveCriticalSection(&prepare_cs);
            EnterCriticalSection(&write_cs);
            auto iter_find = write_map.find(data.key);
            auto iter_end = write_map.end();
            if(iter_find == iter_end){
                filepath = getFileName(data.key.c_str());
                FileHandleData fileInfo;
                fileInfo.filepath = filepath;
                bInsert_ok = write_map.insert(std::make_pair(data.key,fileInfo)).second;
                if(bInsert_ok){
                    write_queue.push_back(std::move(data));
                    LeaveCriticalSection(&write_cs);
                    WakeConditionVariable(&write_cond);
                }
                else{
                    LeaveCriticalSection(&write_cs);
                    errmsg =  __FUNCTION__;
                    errmsg += " insert " + data.key + " failed\r\n";
                    log_error(errmsg);
                }
            }
            else{
                write_queue.push_back(std::move(data));
                LeaveCriticalSection(&write_cs);
                WakeConditionVariable(&write_cond);
            }
        }
        return 0;
    }
    virtual unsigned int __processCloseFileEachTime__(){
        std::string errmsg;
        bool bFind = false;
        std::string filepath;
        DWORD lastError = 0;
        bool hasError = false;
        while(true){
            bFind = false;
            hasError = false;
            EnterCriticalSection(&write_cs);
            while(!bStopProcess && write_queue.empty()){
                SleepConditionVariableCS(&write_cond,&write_cs,INFINITE);
            }
            storeData data = std::move(write_queue.front());
            write_queue.pop_front();
            auto iter_find  = write_map.find(data.key);
            bFind = iter_find == write_map.end();
            if(!bFind){
                LeaveCriticalSection(&write_cs);
                errmsg = __FUNCTION__;
                errmsg += " write_map find error\r\n";
                log_error(errmsg);
                continue;
            }
            volatile bool * pInUse = &iter_find->second.inUse;
            if(*pInUse == true){
                write_queue.push_back(std::move(data));
                LeaveCriticalSection(&write_cs);
                WakeConditionVariable(&write_cond);
                continue;
            }
            *pInUse = true;
            filepath = iter_find->second.filepath;
            LeaveCriticalSection(&write_cs);
            HANDLE hFile = CreateFileA(filepath.c_str(),GENERIC_READ|GENERIC_WRITE,
                                      FILE_SHARE_READ,0,OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL,
                                      NULL);
            if(hFile != INVALID_HANDLE_VALUE){
                setAppendFileHandle(hFile);
                if(WriteFile(hFile,data.data.c_str(),data.data.size() * sizeof(char),0,0)){
                    FlushFileBuffers(hFile);
                    CloseHandle(hFile);
                }
                else{
                    lastError = GetLastError();
                    hasError  = true;
                }
            }
            else{
                lastError = GetLastError();
                hasError = true;
            }
            EnterCriticalSection(&write_cs);
            *pInUse = false;
            LeaveCriticalSection(&write_cs);
            if(hasError){
                errmsg = ErrorInfo::getError(lastError);
                log_error(errmsg);
            }
        }
        return 0;
    }
    virtual unsigned int __prepareOpenFileMode__(){
        bool bFind = false;
        std::string filepath;
        bool bInsert_ok = false;
        std::string errmsg;
        DWORD errorNo =0;
        while(1){
            bFind = false;
            bInsert_ok = false;
            WaitForSingleObject(hSem,INFINITE);
            EnterCriticalSection(&prepare_cs);
            storeData data = std::move(prepare_queue.front());
            prepare_queue.pop_front();
            LeaveCriticalSection(&prepare_cs);
            EnterCriticalSection(&write_cs);
            auto iter = write_map.find(data.key);
            if(iter != write_map.end()){
                bFind = true;
            }
            if(!bFind){
                filepath = getFileName(data.key.c_str());
                HANDLE hFile = CreateFileA(filepath.c_str(),GENERIC_READ|GENERIC_WRITE,
                                                      FILE_SHARE_READ,0,OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL,
                                                      NULL);
                if(hFile != INVALID_HANDLE_VALUE){
                    FileHandleData fileinfo;
                    fileinfo.hFile = hFile;
                    fileinfo.filepath = filepath;
                    fileinfo.inUse = false;
                    bInsert_ok = write_map.insert(make_pair(data.key , fileinfo)).second;
                    if(bInsert_ok){
                        write_queue.push_back(std::move(data));
                        LeaveCriticalSection(&write_cs);
                        WakeConditionVariable(&write_cond);
                    }
                    else{
                        LeaveCriticalSection(&write_cs);
                        errmsg = __FUNCTION__;
                        errmsg += " write_map insert failed \r\n";
                        log_error(errmsg);
                    }
                }
                else{
                    errorNo = GetLastError();
                    LeaveCriticalSection(&write_cs);
                    errmsg = ErrorInfo::getError(errorNo);
                    log_error(errmsg);
                }
            }
            else{
                write_queue.push_back(std::move(data));
                LeaveCriticalSection(&write_cs);
                WakeConditionVariable(&write_cond);
            }
        }
        return 0;
    }
    virtual unsigned int __processOpenFileMode__(){
        bool bFind = false;
        std::string errmsg;
        DWORD errorNo = 0;
        bool hasError = false;
        while (true) {
            bFind = false;
            hasError = false;
            EnterCriticalSection(&write_cs);
            while(!bStopProcess && write_queue.empty()){
                SleepConditionVariableCS(&write_cond,&write_cs,INFINITE);
            }
            storeData data = std::move(write_queue.front());
            write_queue.pop_front();
            auto iter = write_map.find(data.key);
            if(iter != write_map.end())
                bFind = true;
            if(!bFind){
                errmsg = __FUNCTION__;
                errmsg += " write_map find error \r\n";
                LeaveCriticalSection(&write_cs);
                log_error(errmsg);
                continue;
            }
            if(iter->second.inUse){
                write_queue.push_back(std::move(data));
                LeaveCriticalSection(&write_cs);
                WakeConditionVariable(&write_cond);
                continue;
            }
            volatile bool *pInUse = &iter->second.inUse;
            HANDLE hFile = iter->second.hFile;
            LeaveCriticalSection(&write_cs);
            if( INVALID_HANDLE_VALUE == hFile){
                errmsg = __FUNCTION__;
                errmsg += " hFile : INVALID_HANDLE_VALUE\r\n";
                log_error(errmsg);
                continue;
            }
            if(WriteFile(hFile,data.data.c_str() , data.data.size() * sizeof(char),0,0)){
                FlushFileBuffers(hFile);
            }
            else{
                hasError= true;
                errorNo = GetLastError();
            }
            EnterCriticalSection(&write_cs);
            *pInUse = false;
            LeaveCriticalSection(&write_cs);
            if(hasError){
                errmsg = __FUNCTION__;
                errmsg += ErrorInfo::getError(errorNo);
                errmsg += "\r\n";
                log_error(errmsg);
            }
        }
        return 0;
    }
private:
    RecordFile(const RecordFile&);
    RecordFile& operator=(const RecordFile &);
};

 

标签:std,文件,记录,write,__,cs,多线程,data,errmsg
来源: https://blog.csdn.net/dashoumeixi/article/details/100600725