BeeGFS源码分析1-元数据服务概要分析
< 返回列表时间: 2019-07-02来源:OSCHINA
元数据服务是 BeeGFS 中用来维护文件和目录关系及其属性配置的服务,其多线程epoll设计实现非常高效,主要流程如下: ConnAcceptor(PThread)类(一个线程)负责监听端口,并接受客户端连接,然后把;连接信息(包含接收的套接字)写入管道; StreamListenerV2(PThread)类(多个线程,可配置)从管道读取连接信息,使用epoll轮询接收数据,然后生成IncomingPreprocessedMsgWork(Work),写入MultiWorkQueue先进先出队列; Worker(PThread)类(多个线程,可配置)从MultiWorkQueue队列取出消息进行处理。
程序初始化
主函数 创建App对象,App对象是程序的主要载体: // fhgfs_meta\source\program\main.cpp #include "Program.h" int main(int argc, char** argv) { return Program::main(argc, argv); } // fhgfs_meta\source\program\Program.cpp #include <common/toolkit/BuildTypeTk.h> #include "Program.h" #include <testing/TestRunner.h> App* Program::app = NULL; int Program::main(int argc, char** argv) { BuildTypeTk::checkDebugBuildTypes(); AbstractApp::runTimeInitsAndChecks(); // must be called before creating a new App app = new App(argc, argv); app->startInCurrentThread(); int appRes = app->getAppResult(); delete app; return appRes; }
创建ConnAcceptor 主程序中会初始化一个线程,监听服务端口,由ConnAcceptor类负责: // fhgfs_meta\source\app\App.cpp void App::initComponents(TargetConsistencyState initialConsistencyState) throw(ComponentInitException) { this->log->log(Log_DEBUG, "Initializing components..."); this->dgramListener = new DatagramListener( netFilter, localNicList, ackStore, cfg->getConnMetaPortUDP() ); if(cfg->getTuneListenerPrioShift() ) dgramListener->setPriorityShift(cfg->getTuneListenerPrioShift() ); streamListenersInit(); unsigned short listenPort = cfg->getConnMetaPortTCP(); this->connAcceptor = new ConnAcceptor(this, localNicList, listenPort); this->statsCollector = new StatsCollector(workQueue, STATSCOLLECTOR_COLLECT_INTERVAL_MS, STATSCOLLECTOR_HISTORY_LENGTH); this->buddyResyncer = new BuddyResyncer(); this->internodeSyncer = new InternodeSyncer(initialConsistencyState); this->timerQueue = new TimerQueue(1, 1); this->modificationEventFlusher = new ModificationEventFlusher(); workersInit(); commSlavesInit(); this->log->log(Log_DEBUG, "Components initialized."); }
创建StreamListener 根据配置创建多个StreamListener实例,每个实例对应线程,用于从ConnAcceptor接收新连接,已及从从连接读取数据,生成Work: // fhgfs_meta\source\app\App.cpp void App::streamListenersInit() throw(ComponentInitException) { this->numStreamListeners = cfg->getTuneNumStreamListeners(); for(unsigned i=0; i < numStreamListeners; i++) { StreamListenerV2* listener = new StreamListenerV2( std::string("StreamLis") + StringTk::uintToStr(i+1), this, workQueue); if(cfg->getTuneListenerPrioShift() ) listener->setPriorityShift(cfg->getTuneListenerPrioShift() ); if(cfg->getTuneUseAggressiveStreamPoll() ) listener->setUseAggressivePoll(); streamLisVec.push_back(listener); } }
创建WorkQueue 创建WorkQueue,用于保存StreamListener生成的Work: // fhgfs_meta\source\app\App.cpp /** * Init basic shared objects like work queues, node stores etc. */ void App::initDataObjects() throw(InvalidConfigException) { ... this->workQueue = new MultiWorkQueue(); this->commSlaveQueue = new MultiWorkQueue(); if(cfg->getTuneUsePerUserMsgQueues() ) workQueue->setIndirectWorkList(new UserWorkContainer() ); ... }
创建Worker 根据配置创建Worker线程,从WorkQueue读取Work并进行处理: // fhgfs_meta\source\app\App.cpp void App::workersInit() throw(ComponentInitException) { unsigned numWorkers = cfg->getTuneNumWorkers(); for(unsigned i=0; i < numWorkers; i++) { Worker* worker = new Worker( std::string("Worker") + StringTk::uintToStr(i+1), workQueue, QueueWorkType_INDIRECT); worker->setBufLens(cfg->getTuneWorkerBufSize(), cfg->getTuneWorkerBufSize() ); workerList.push_back(worker); } for(unsigned i=0; i < APP_WORKERS_DIRECT_NUM; i++) { Worker* worker = new Worker( std::string("DirectWorker") + StringTk::uintToStr(i+1), workQueue, QueueWorkType_DIRECT); worker->setBufLens(cfg->getTuneWorkerBufSize(), cfg->getTuneWorkerBufSize() ); workerList.push_back(worker); } }
连接监听
监听类ConnAcceptor ConnAcceptor类的定义: // fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.h class ConnAcceptor : public PThread { public: ConnAcceptor(AbstractApp* app, NicAddressList& localNicList, unsigned short listenPort) throw(ComponentInitException); virtual ~ConnAcceptor(); private: AbstractApp* app; LogContext log; StandardSocket* tcpListenSock; StandardSocket* sdpListenSock; RDMASocket* rdmaListenSock; int epollFD; bool initSocks(unsigned short listenPort, NicListCapabilities* localNicCaps); virtual void run(); void listenLoop(); void onIncomingStandardConnection(StandardSocket* sock); void onIncomingRDMAConnection(RDMASocket* sock); void applySocketOptions(StandardSocket* sock); public: // getters & setters };
连接监听循环 使用epool来轮询监听端口,并建立新连接: // fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp void ConnAcceptor::run() { try { registerSignalHandler(); listenLoop(); log.log(Log_DEBUG, "Component stopped."); } catch(std::exception& e) { PThread::getCurrentThreadApp()->handleComponentException(e); } } void ConnAcceptor::listenLoop() { const int epollTimeoutMS = 3000; struct epoll_event epollEvents[EPOLL_EVENTS_NUM]; // (just to have these values on the stack...) const int epollFD = this->epollFD; RDMASocket* rdmaListenSock = this->rdmaListenSock; StandardSocket* sdpListenSock = this->sdpListenSock; StandardSocket* tcpListenSock = this->tcpListenSock; // wait for incoming events and handle them... while(!getSelfTerminate() ) { //log.log(Log_DEBUG, std::string("Before poll(). pollArrayLen: ") + // StringTk::uintToStr(pollArrayLen) ); int epollRes = epoll_wait(epollFD, epollEvents, EPOLL_EVENTS_NUM, epollTimeoutMS); if(unlikely(epollRes < 0) ) { // error occurred if(errno == EINTR) // ignore interruption, because the debugger causes this continue; log.logErr(std::string("Unrecoverable epoll_wait error: ") + System::getErrString() ); break; } // handle incoming connection attempts for(size_t i=0; i < (size_t)epollRes; i++) { struct epoll_event* currentEvent = &epollEvents[i]; Pollable* currentPollable = (Pollable*)currentEvent->data.ptr; //log.log(Log_DEBUG, std::string("Incoming data on FD: ") + // StringTk::intToStr(pollArray[i].fd) ); // debug in if(currentPollable == rdmaListenSock) onIncomingRDMAConnection(rdmaListenSock); else if(currentPollable == tcpListenSock) onIncomingStandardConnection(tcpListenSock); else if(currentPollable == sdpListenSock) onIncomingStandardConnection(sdpListenSock); else { // unknown connection => should never happen log.log(Log_WARNING, "Should never happen: Ignoring event for unknown connection. " "FD: " + StringTk::uintToStr(currentPollable->getFD() ) ); } } } }
套接字监听处理(派发给流) 把建立的套接字发送给指定的StreamListener: // fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp /** * Accept the incoming connection and add new socket to StreamListenerV2 queue. * * Note: This is for standard sockets like TCP and SDP. */ void ConnAcceptor::onIncomingStandardConnection(StandardSocket* sock) { try { struct sockaddr_in peerAddr; socklen_t peerAddrLen = sizeof(peerAddr); StandardSocket* acceptedSock = (StandardSocket*)sock->accept( (struct sockaddr*)&peerAddr, &peerAddrLen); // (note: level Log_DEBUG to avoid spamming the log until we have log topics) log.log(Log_DEBUG, std::string("Accepted new connection from " + Socket::endpointAddrToString(&peerAddr.sin_addr, ntohs(peerAddr.sin_port) ) ) + std::string(" [SockFD: ") + StringTk::intToStr(acceptedSock->getFD() ) + std::string("]") ); applySocketOptions(acceptedSock); // hand the socket over to a stream listener StreamListenerV2* listener = app->getStreamListenerByFD(acceptedSock->getFD() ); StreamListenerV2::SockReturnPipeInfo returnInfo( StreamListenerV2::SockPipeReturn_NEWCONN, acceptedSock); listener->getSockReturnFD()->write(&returnInfo, sizeof(returnInfo) ); } catch(SocketException& se) { log.logErr(std::string("Trying to continue after connection accept error: ") + se.what() ); } }
流处理的选择 选择StreamListener时,是根据fd的数值取模运算得来: // fhgfs_meta\source\app\App.h class App : public AbstractApp { public: /** * Get one of the available stream listeners based on the socket file descriptor number. * This is to load-balance the sockets over all available stream listeners and ensure that * sockets are not bouncing between different stream listeners. * * Note that IB connections eat two fd numbers, so 2 and multiples of 2 might not be a good * value for number of stream listeners. */ virtual StreamListenerV2* getStreamListenerByFD(int fd) { return streamLisVec[fd % numStreamListeners]; } }
数据包流处理
流处理类StreamListenerV2 StreamListener的定义: // fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.h class StreamListenerV2 : public PThread { public: /** * This is what we will send over the socket return pipe */ struct SockReturnPipeInfo { /** * Standard constructor for creating/sending a returnInfo. */ SockReturnPipeInfo(SockPipeReturnType returnType, Socket* sock) : returnType(returnType), sock(sock) {} /** * For receiving only (no initialization of members). */ SockReturnPipeInfo() {} SockPipeReturnType returnType; Socket* sock; }; }
流处理循环 StreamListener使用epoll同时处理新连接以及数据接收: // fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp void StreamListenerV2::run() { try { registerSignalHandler(); listenLoop(); log.log(Log_DEBUG, "Component stopped."); } catch(std::exception& e) { PThread::getCurrentThreadApp()->handleComponentException(e); } } void StreamListenerV2::listenLoop() { const int epollTimeoutMS = useAggressivePoll ? 0 : 3000; struct epoll_event epollEvents[EPOLL_EVENTS_NUM]; // (just to have these values on the stack...) const int epollFD = this->epollFD; FileDescriptor* sockReturnPipeReadEnd = this->sockReturnPipe->getReadFD(); bool runRDMAConnIdleCheck = false; // true just means we call the method (not enforce the check) // wait for incoming events and handle them... while(!getSelfTerminate() ) { //log.log(Log_DEBUG, std::string("Before poll(). pollArrayLen: ") + // StringTk::uintToStr(pollArrayLen) ); int epollRes = epoll_wait(epollFD, epollEvents, EPOLL_EVENTS_NUM, epollTimeoutMS); if(unlikely(epollRes < 0) ) { // error occurred if(errno == EINTR) // ignore interruption, because the debugger causes this continue; log.logErr(std::string("Unrecoverable epoll_wait error: ") + System::getErrString() ); break; } else if(unlikely(!epollRes || (rdmaCheckForceCounter++ > RDMA_CHECK_FORCE_POLLLOOPS) ) ) { // epollRes==0 is nothing to worry about, just idle // note: we can't run idle check here directly because the check might modify the // poll set, which will be accessed in the loop below runRDMAConnIdleCheck = true; } // handle incoming data & connection attempts for(size_t i=0; i < (size_t)epollRes; i++) { struct epoll_event* currentEvent = &epollEvents[i]; Pollable* currentPollable = (Pollable*)currentEvent->data.ptr; if(currentPollable == sockReturnPipeReadEnd) onSockReturn(); else onIncomingData( (Socket*)currentPollable); } if(unlikely(runRDMAConnIdleCheck) ) { // note: whether check actually happens depends on elapsed time since last check runRDMAConnIdleCheck = false; rdmaConnIdleCheck(); } } }
新连接处理 如果是新连接,则加入epoll的fd中: // fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp /** * Receive pointer to returned socket through the sockReturnPipe and re-add it to the pollList. */ void StreamListenerV2::onSockReturn() { SockReturnPipeInfo returnInfos[SOCKRETURN_SOCKS_NUM]; // try to get multiple returnInfos at once (if available) ssize_t readRes = sockReturnPipe->getReadFD()->read(&returnInfos, sizeof(SockReturnPipeInfo) ); // loop: walk over each info and handle the contained socket for(size_t i=0; ; i++) { SockReturnPipeInfo& currentReturnInfo = returnInfos[i]; // make sure we have a complete SockReturnPipeInfo if(unlikely(readRes < (ssize_t)sizeof(SockReturnPipeInfo) ) ) { // only got a partial SockReturnPipeInfo => recv the rest char* currentReturnInfoChar = (char*)¤tReturnInfo; sockReturnPipe->getReadFD()->readExact( ¤tReturnInfoChar[readRes], sizeof(SockReturnPipeInfo)-readRes); readRes = sizeof(SockReturnPipeInfo); } // handle returnInfo depending on contained returnType... Socket* currentSock = currentReturnInfo.sock; SockPipeReturnType returnType = currentReturnInfo.returnType; switch(returnType) { case SockPipeReturn_MSGDONE_NOIMMEDIATE: { // most likely case: worker is done with a msg and now returns the sock to the epoll set struct epoll_event epollEvent; epollEvent.events = EPOLLIN | EPOLLONESHOT | EPOLLET; epollEvent.data.ptr = currentSock; int epollRes = epoll_ctl(epollFD, EPOLL_CTL_MOD, currentSock->getFD(), &epollEvent); if(likely(!epollRes) ) { // sock was successfully re-armed in epoll set pollList.add(currentSock); break; // break out of switch } else if(errno != ENOENT) { // error log.logErr("Unable to re-arm sock in epoll set. " "FD: " + StringTk::uintToStr(currentSock->getFD() ) + "; " "SockTypeNum: " + StringTk::uintToStr(currentSock->getSockType() ) + "; " "SysErr: " + System::getErrString() ); log.log(Log_NOTICE, "Disconnecting: " + currentSock->getPeername() ); delete(currentSock); break; // break out of switch } /* for ENOENT, we fall through to NEWCONN, because this socket appearently wasn't used with this stream listener yet, so we need to add it (instead of modify it) */ } // might fall through here on ENOENT case SockPipeReturn_NEWCONN: { // new conn from ConnAcceptor (or wasn't used with this stream listener yet) // add new socket file descriptor to epoll set struct epoll_event epollEvent; epollEvent.events = EPOLLIN | EPOLLONESHOT | EPOLLET; epollEvent.data.ptr = currentSock; int epollRes = epoll_ctl(epollFD, EPOLL_CTL_ADD, currentSock->getFD(), &epollEvent); if(likely(!epollRes) ) { // socket was successfully added to epoll set pollList.add(currentSock); } else { // adding to epoll set failed => unrecoverable error log.logErr("Unable to add sock to epoll set. " "FD: " + StringTk::uintToStr(currentSock->getFD() ) + " " "SockTypeNum: " + StringTk::uintToStr(currentSock->getSockType() ) + " " "SysErr: " + System::getErrString() ); log.log(Log_NOTICE, "Disconnecting: " + currentSock->getPeername() ); delete(currentSock); } } break; case SockPipeReturn_MSGDONE_WITHIMMEDIATE: { // special case: worker detected that immediate data is available after msg processing // data immediately available => recv header and so on onIncomingData(currentSock); } break; default: { // should never happen: unknown/unhandled returnType log.logErr("Should never happen: " "Unknown socket return type: " + StringTk::uintToStr(returnType) ); } break; } // end of switch(returnType) readRes -= sizeof(SockReturnPipeInfo); if(!readRes) break; // all received returnInfos have been processed } // end of "for each received SockReturnPipeInfo" loop }
数据包处理(生成工作) 生成Work(IncomingPreprocessedMsgWork),并放进队列(MultiWorkQueue): // fhgfs_common\source\common\components\streamlistenerv2\StreamListenerV2.cpp /** * Receive msg header and add the socket to the work queue. */ void StreamListenerV2::onIncomingData(Socket* sock) { // check whether this is just a false alarm from a RDMASocket if( (sock->getSockType() == NICADDRTYPE_RDMA) && isFalseAlarm( (RDMASocket*)sock) ) { return; } try { const int recvTimeoutMS = 5000; char msgHeaderBuf[NETMSG_HEADER_LENGTH]; NetMessageHeader msgHeader; // receive & deserialize message header sock->recvExactT(msgHeaderBuf, NETMSG_HEADER_LENGTH, 0, recvTimeoutMS); NetMessage::deserializeHeader(msgHeaderBuf, NETMSG_HEADER_LENGTH, &msgHeader); /* (note on header verification: we leave header verification work to the worker threads to save CPU cycles in the stream listener and instead just take what we need to know here, no matter whether the header is valid or not.) */ // create work and add it to queue //log.log(Log_DEBUG, "Creating new work for to the queue"); IncomingPreprocessedMsgWork* work = new IncomingPreprocessedMsgWork(app, sock, &msgHeader); int sockFD = sock->getFD(); /* note: we store this here for delayed pollList removal, because worker thread might disconnect, so the sock gets deleted by the worker and thus "sock->" pointer becomes invalid */ sock->setHasActivity(); // mark sock as active (for idle disconnect check) // (note: userID intToStr (not uint) because default userID (~0) looks better this way) LOG_DEBUG("StreamListenerV2::onIncomingData", Log_DEBUG, "Incoming message: " + NetMsgStrMapping().defineToStr(msgHeader.msgType) + "; " "from: " + sock->getPeername() + "; " "userID: " + StringTk::intToStr(msgHeader.msgUserID) + (msgHeader.msgTargetID ? "; targetID: " + StringTk::uintToStr(msgHeader.msgTargetID) : "") ); if (sock->getIsDirect()) getWorkQueue(msgHeader.msgTargetID)->addDirectWork(work, msgHeader.msgUserID); else getWorkQueue(msgHeader.msgTargetID)->addIndirectWork(work, msgHeader.msgUserID); /* notes on sock handling: *) no need to remove sock from epoll set, because we use edge-triggered mode with oneshot flag (which disables further events after first one has been reported). *) a sock that is closed by a worker is not a problem, because it will automatically be removed from the epoll set by the kernel. *) we just need to re-arm the epoll entry upon sock return. */ pollList.removeByFD(sockFD); return; } catch(SocketTimeoutException& e) { log.log(Log_NOTICE, "Connection timed out: " + sock->getPeername() ); } catch(SocketDisconnectException& e) { // (note: level Log_DEBUG here to avoid spamming the log until we have log topics) log.log(Log_DEBUG, std::string(e.what() ) ); } catch(SocketException& e) { log.log(Log_NOTICE, "Connection error: " + sock->getPeername() + ": " + std::string(e.what() ) ); } // socket exception occurred => cleanup pollList.removeByFD(sock->getFD() ); IncomingPreprocessedMsgWork::invalidateConnection(sock); // also includes delete(sock) }
工作处理
工人类(Worker) // fhgfs_common\source\common\components\streamlistenerv2\ConnAcceptor.cpp #define WORKER_BUFIN_SIZE (1024*1024*4) #define WORKER_BUFOUT_SIZE WORKER_BUFIN_SIZE class Worker : public PThread { public: Worker(std::string workerID, MultiWorkQueue* workQueue, QueueWorkType workType) throw(ComponentInitException); virtual ~Worker() { SAFE_FREE(bufIn); SAFE_FREE(bufOut); } private: LogContext log; bool terminateWithFullQueue; // allow self-termination when queue not empty (see setter nodes) size_t bufInLen; char* bufIn; size_t bufOutLen; char* bufOut; MultiWorkQueue* workQueue; QueueWorkType workType; HighResolutionStats stats; virtual void run(); void workLoopAnyWork(); void workLoopDirectWork(); void initBuffers(); // inliners bool maySelfTerminateNow() { if(terminateWithFullQueue || (!workQueue->getDirectWorkListSize() && !workQueue->getIndirectWorkListSize() ) ) return true; return false; } public: // setters & getters /** * Note: Do not use this after the run method of this component has been called! */ void setBufLens(size_t bufInLen, size_t bufOutLen) { this->bufInLen = bufInLen; this->bufOutLen = bufOutLen; } /** * WARNING: This will only work if there is only a single worker attached to a queue. * Otherwise the queue would need a getWorkAndDontWait() method that is used during the * termination phase of the worker, because the queue might become empty before the worker * calls waitForWork() after the maySelfTerminateNow check. */ void disableTerminationWithFullQueue() { this->terminateWithFullQueue = false; } };
工作类(Work) // fhgfs_common\source\common\components\worker\Work.h class Work { public: Work() { HighResolutionStatsTk::resetStats(&stats); } virtual ~Work() {} Work(const Work&) = delete; Work(Work&&) = delete; Work& operator=(const Work&) = delete; Work& operator=(Work&&) = delete; virtual void process(char* bufIn, unsigned bufInLen, char* bufOut, unsigned bufOutLen) = 0; protected: HighResolutionStats stats; public: HighResolutionStats* getHighResolutionStats() { return &stats; } #ifdef BEEGFS_DEBUG_PROFILING TimeFine* getAgeTime() { return &age; } private: TimeFine age; #endif }; // fhgfs_common\source\common\components\streamlistenerv2\IncomingPreprocessedMsgWork.h class IncomingPreprocessedMsgWork : public Work { public: /** * Note: Be aware that this class is only for stream connections that need to be returned * to a StreamListenerV2 after processing. * * @param msgHeader contents will be copied */ IncomingPreprocessedMsgWork(AbstractApp* app, Socket* sock, NetMessageHeader* msgHeader) { this->app = app; this->sock = sock; this->msgHeader = *msgHeader; } virtual void process(char* bufIn, unsigned bufInLen, char* bufOut, unsigned bufOutLen); static void releaseSocket(AbstractApp* app, Socket** sock, NetMessage* msg); static void invalidateConnection(Socket* sock); static bool checkRDMASocketImmediateData(AbstractApp* app, Socket* sock); private: AbstractApp* app; Socket* sock; NetMessageHeader msgHeader; };
工作循环 从WorkQueens获取Work并进行处理: // fhgfs_common\source\common\components\worker\Worker.cpp void Worker::workLoop(QueueWorkType workType) { LOG(DEBUG, "Ready", as("TID", System::getTID()), workType); workQueue->incNumWorkers(); // add this worker to queue stats while(!getSelfTerminate() || !maySelfTerminateNow() ) { Work* work = waitForWorkByType(stats, personalWorkQueue, workType); #ifdef BEEGFS_DEBUG_PROFILING TimeFine workStartTime; #endif HighResolutionStatsTk::resetStats(&stats); // prepare stats // process the work packet work->process(bufIn, bufInLen, bufOut, bufOutLen); // update stats stats.incVals.workRequests = 1; HighResolutionStatsTk::addHighResIncStats(*work->getHighResolutionStats(), stats); #ifdef BEEGFS_DEBUG_PROFILING TimeFine workEndTime; const auto workElapsedMS = workEndTime.elapsedSinceMS(&workStartTime); const auto workLatencyMS = workEndTime.elapsedSinceMS(work->getAgeTime()); if (workElapsedMS >= 10) { if (workLatencyMS >= 10) LOG_TOP(WORKQUEUES, DEBUG, "Work processed.", as("Elapsed ms", workElapsedMS), as("Total latency (ms)", workLatencyMS)); else LOG_TOP(WORKQUEUES, DEBUG, "Work processed.", as("Elapsed ms", workElapsedMS), as("Total latency (us)", workEndTime.elapsedSinceMicro(work->getAgeTime()))); } else { if (workLatencyMS >= 10) { LOG_TOP(WORKQUEUES, DEBUG, "Work processed.", as("Elapsed us", workEndTime.elapsedSinceMicro(&workStartTime)), as("Total latency (ms)", workEndTime.elapsedSinceMS(work->getAgeTime()))); } else { LOG_TOP(WORKQUEUES, DEBUG, "Work processed.", as("Elapsed us", workEndTime.elapsedSinceMicro(&workStartTime)), as("Total latency (us)", workEndTime.elapsedSinceMicro(work->getAgeTime()))); } } #endif // cleanup delete(work); } }
工作处理(消息生成和处理) 处理Work时,使用Work基类的processIncoming虚函数进行处理: // fhgfs_common\source\common\components\streamlistenerv2\IncomingPreprocessedMsgWork.cpp void IncomingPreprocessedMsgWork::process(char* bufIn, unsigned bufInLen, char* bufOut, unsigned bufOutLen) { const char* logContextStr = "Work (process incoming msg)"; const int recvTimeoutMS = 5000; unsigned numReceived = NETMSG_HEADER_LENGTH; // (header actually received by stream listener) NetMessage* msg = NULL; try { // attach stats to sock (stream listener already received the msg header) stats.incVals.netRecvBytes += NETMSG_HEADER_LENGTH; sock->setStats(&stats); // make sure msg length fits into our receive buffer unsigned msgLength = msgHeader.msgLength; unsigned msgPayloadLength = msgLength - numReceived; if(unlikely(msgPayloadLength > bufInLen) ) { // message too big LogContext(logContextStr).log(Log_NOTICE, std::string("Received a message that is too large. Disconnecting: ") + sock->getPeername() ); sock->unsetStats(); invalidateConnection(sock); return; } // receive the message payload if(msgPayloadLength) sock->recvExactT(bufIn, msgPayloadLength, 0, recvTimeoutMS); // we got the complete message buffer => create msg object AbstractApp* app = PThread::getCurrentThreadApp(); ICommonConfig* cfg = app->getCommonConfig(); AbstractNetMessageFactory* netMessageFactory = app->getNetMessageFactory(); msg = netMessageFactory->createFromPreprocessedBuf(&msgHeader, bufIn, msgPayloadLength); if(unlikely(msg->getMsgType() == NETMSGTYPE_Invalid) ) { // message invalid LogContext(logContextStr).log(Log_NOTICE, std::string("Received an invalid message. Disconnecting: ") + sock->getPeername() ); sock->unsetStats(); invalidateConnection(sock); delete(msg); return; } // process the received msg bool processRes = false; if(likely(!cfg->getConnAuthHash() || sock->getIsAuthenticated() || (msg->getMsgType() == NETMSGTYPE_AuthenticateChannel) ) ) { // auth disabled or channel is auth'ed or this is an auth msg => process NetMessage::ResponseContext rctx(NULL, sock, bufOut, bufOutLen, &stats); processRes = msg->processIncoming(rctx); } else LogContext(logContextStr).log(Log_NOTICE, std::string("Rejecting message from unauthenticated peer: ") + sock->getPeername() ); // processing completed => cleanup bool needSockRelease = msg->getReleaseSockAfterProcessing(); delete(msg); msg = NULL; if(!needSockRelease) return; // sock release was already done within msg->processIncoming() method if(unlikely(!processRes) ) { // processIncoming encountered messaging error => invalidate connection LogContext(logContextStr).log(Log_NOTICE, std::string("Problem encountered during processing of a message. Disconnecting: ") + sock->getPeername() ); invalidateConnection(sock); return; } releaseSocket(app, &sock, NULL); return; } catch(SocketTimeoutException& e) { LogContext(logContextStr).log(Log_NOTICE, std::string("Connection timed out: ") + sock->getPeername() ); } catch(SocketDisconnectException& e) { // (note: level Log_DEBUG here to avoid spamming the log until we have log topics) LogContext(logContextStr).log(Log_DEBUG, std::string(e.what() ) ); } catch(SocketException& e) { LogContext(logContextStr).log(Log_NOTICE, std::string("Connection error: ") + sock->getPeername() + std::string(": ") + std::string(e.what() ) ); } // socket exception occurred => cleanup if(msg && msg->getReleaseSockAfterProcessing() ) { sock->unsetStats(); invalidateConnection(sock); } SAFE_DELETE(msg); }
消息工厂
消息工厂类(NetMessageFactory) StreamListener收到数据时使用消息工厂类生成各种类型的消息: // fhgfs_meta\source\net\message\NetMessageFactory.h class NetMessageFactory : public AbstractNetMessageFactory { public: NetMessageFactory() {} protected: virtual NetMessage* createFromMsgType(unsigned short msgType); } ;
消息工厂初始化 // fhgfs_meta\source\app\App.cpp /** * Init basic networking data structures. * * Note: no RDMA is detected here, because this needs to be done later */ void App::initBasicNetwork() { // check if management host is defined if(!cfg->getSysMgmtdHost().length() ) throw InvalidConfigException("Management host undefined"); // prepare filter for outgoing packets/connections this->netFilter = new NetFilter(cfg->getConnNetFilterFile() ); this->tcpOnlyFilter = new NetFilter(cfg->getConnTcpOnlyFilterFile() ); // prepare filter for interfaces StringList allowedInterfaces; std::string interfacesList = cfg->getConnInterfacesList(); if(!interfacesList.empty() ) { log->log(Log_DEBUG, "Allowed interfaces: " + interfacesList); StringTk::explodeEx(interfacesList, ',', true, &allowedInterfaces); } // discover local NICs and filter them NetworkInterfaceCard::findAllInterfaces(allowedInterfaces, cfg->getConnUseSDP(), localNicList); if(localNicList.empty() ) throw InvalidConfigException("Couldn't find any usable NIC"); localNicList.sort(&NetworkInterfaceCard::nicAddrPreferenceComp); // prepare factory for incoming messages this->netMessageFactory = new NetMessageFactory(); }
生成消息 消息实例的生成均根据msgType来确定: // fhgfs_meta\source\net\message\NetMessageFactory.cpp /** * @return NetMessage that must be deleted by the caller * (msg->msgType is NETMSGTYPE_Invalid on error) */ NetMessage* NetMessageFactory::createFromMsgType(unsigned short msgType) { NetMessage* msg; switch(msgType) { // The following lines are grouped by "type of the message" and ordered alphabetically inside // the groups. There should always be one message per line to keep a clear layout (although // this might lead to lines that are longer than usual) // control messages case NETMSGTYPE_Ack: { msg = new AckMsgEx(); } break; case NETMSGTYPE_AuthenticateChannel: { msg = new AuthenticateChannelMsgEx(); } break; case NETMSGTYPE_GenericResponse: { msg = new GenericResponseMsg(); } break; case NETMSGTYPE_SetChannelDirect: { msg = new SetChannelDirectMsgEx(); } break; case NETMSGTYPE_PeerInfo: { msg = new PeerInfoMsgEx(); } break; // nodes messages case NETMSGTYPE_ChangeTargetConsistencyStatesResp: { msg = new ChangeTargetConsistencyStatesRespMsg(); } break; case NETMSGTYPE_GenericDebug: { msg = new GenericDebugMsgEx(); } break; case NETMSGTYPE_GetClientStats: { msg = new GetClientStatsMsgEx(); } break; case NETMSGTYPE_GetMirrorBuddyGroupsResp: { msg = new GetMirrorBuddyGroupsRespMsg(); } break; case NETMSGTYPE_GetNodeCapacityPools: { msg = new GetNodeCapacityPoolsMsgEx(); } break; case NETMSGTYPE_GetNodeCapacityPoolsResp: { msg = new GetNodeCapacityPoolsRespMsg(); } break; case NETMSGTYPE_GetNodes: { msg = new GetNodesMsgEx(); } break; case NETMSGTYPE_GetNodesResp: { msg = new GetNodesRespMsg(); } break; case NETMSGTYPE_GetStatesAndBuddyGroupsResp: { msg = new GetStatesAndBuddyGroupsRespMsg(); } break; case NETMSGTYPE_GetTargetMappings: { msg = new GetTargetMappingsMsgEx(); } break; case NETMSGTYPE_GetTargetMappingsResp: { msg = new GetTargetMappingsRespMsg(); } break; case NETMSGTYPE_GetTargetStatesResp: { msg = new GetTargetStatesRespMsg(); } break; case NETMSGTYPE_HeartbeatRequest: { msg = new HeartbeatRequestMsgEx(); } break; case NETMSGTYPE_Heartbeat: { msg = new HeartbeatMsgEx(); } break; case NETMSGTYPE_MapTargets: { msg = new MapTargetsMsgEx(); } break; case NETMSGTYPE_PublishCapacities: { msg = new PublishCapacitiesMsgEx(); } break; case NETMSGTYPE_RegisterNodeResp: { msg = new RegisterNodeRespMsg(); } break; case NETMSGTYPE_RemoveNode: { msg = new RemoveNodeMsgEx(); } break; case NETMSGTYPE_RemoveNodeResp: { msg = new RemoveNodeRespMsg(); } break; case NETMSGTYPE_RefreshCapacityPools: { msg = new RefreshCapacityPoolsMsgEx(); } break; case NETMSGTYPE_RefreshTargetStates: { msg = new RefreshTargetStatesMsgEx(); } break; case NETMSGTYPE_SetMirrorBuddyGroup: { msg = new SetMirrorBuddyGroupMsgEx(); } break; case NETMSGTYPE_SetRootNodeIDResp: { msg = new SetRootNodeIDRespMsg(); } break; case NETMSGTYPE_SetTargetConsistencyStates: { msg = new SetTargetConsistencyStatesMsgEx(); } break; case NETMSGTYPE_SetTargetConsistencyStatesResp: { msg = new SetTargetConsistencyStatesRespMsg(); } break; // storage messages case NETMSGTYPE_FindEntryname: { msg = new FindEntrynameMsgEx(); } break; case NETMSGTYPE_FindLinkOwner: { msg = new FindLinkOwnerMsgEx(); } break; case NETMSGTYPE_FindOwner: { msg = new FindOwnerMsgEx(); } break; case NETMSGTYPE_FindOwnerResp: { msg = new FindOwnerRespMsg(); } break; case NETMSGTYPE_GetChunkFileAttribsResp: { msg = new GetChunkFileAttribsRespMsg(); } break; case NETMSGTYPE_GetStorageTargetInfo: { msg = new GetStorageTargetInfoMsgEx(); } break; case NETMSGTYPE_GetEntryInfo: { msg = new GetEntryInfoMsgEx(); } break; case NETMSGTYPE_GetEntryInfoResp: { msg = new GetEntryInfoRespMsg(); } break; case NETMSGTYPE_GetHighResStats: { msg = new GetHighResStatsMsgEx(); } break; case NETMSGTYPE_GetMetaResyncStats: { msg = new GetMetaResyncStatsMsgEx(); } break; case NETMSGTYPE_RequestExceededQuotaResp: {msg = new RequestExceededQuotaRespMsg(); } break; case NETMSGTYPE_SetExceededQuota: {msg = new SetExceededQuotaMsgEx(); } break; case NETMSGTYPE_StorageResyncStarted: { msg = new StorageResyncStartedMsgEx(); } break; case NETMSGTYPE_StorageResyncStartedResp: { msg = new StorageResyncStartedRespMsg(); } break; case NETMSGTYPE_GetXAttr: { msg = new GetXAttrMsgEx(); } break; case NETMSGTYPE_Hardlink: { msg = new HardlinkMsgEx(); } break; case NETMSGTYPE_HardlinkResp: { msg = new HardlinkRespMsg(); } break; case NETMSGTYPE_ListDirFromOffset: { msg = new ListDirFromOffsetMsgEx(); } break; case NETMSGTYPE_ListDirFromOffsetResp: { msg = new ListDirFromOffsetRespMsg(); } break; case NETMSGTYPE_ListXAttr: { msg = new ListXAttrMsgEx(); } break; case NETMSGTYPE_LookupIntent: { msg = new LookupIntentMsgEx(); } break; case NETMSGTYPE_LookupIntentResp: { msg = new LookupIntentRespMsg(); } break; case NETMSGTYPE_MkDir: { msg = new MkDirMsgEx(); } break; case NETMSGTYPE_MkDirResp: { msg = new MkDirRespMsg(); } break; case NETMSGTYPE_MkFile: { msg = new MkFileMsgEx(); } break; case NETMSGTYPE_MkFileResp: { msg = new MkFileRespMsg(); } break; case NETMSGTYPE_MkFileWithPattern: { msg = new MkFileWithPatternMsgEx(); } break; case NETMSGTYPE_MkFileWithPatternResp: { msg = new MkFileWithPatternRespMsg(); } break; case NETMSGTYPE_MkLocalDir: { msg = new MkLocalDirMsgEx(); } break; case NETMSGTYPE_MkLocalDirResp: { msg = new MkLocalDirRespMsg(); } break; case NETMSGTYPE_MkLocalFileResp: { msg = new MkLocalFileRespMsg(); } break; case NETMSGTYPE_MovingDirInsert: { msg = new MovingDirInsertMsgEx(); } break; case NETMSGTYPE_MovingDirInsertResp: { msg = new MovingDirInsertRespMsg(); } break; case NETMSGTYPE_MovingFileInsert: { msg = new MovingFileInsertMsgEx(); } break; case NETMSGTYPE_MovingFileInsertResp: { msg = new MovingFileInsertRespMsg(); } break; case NETMSGTYPE_RefreshEntryInfo: { msg = new RefreshEntryInfoMsgEx(); } break; case NETMSGTYPE_RefreshEntryInfoResp: { msg = new RefreshEntryInfoRespMsg(); } break; case NETMSGTYPE_ResyncRawInodes: { msg = new ResyncRawInodesMsgEx(); } break; case NETMSGTYPE_ResyncRawInodesResp: { msg = new ResyncRawInodesRespMsg(); } break; case NETMSGTYPE_ResyncSessionStore: { msg = new ResyncSessionStoreMsgEx(); } break; case NETMSGTYPE_ResyncSessionStoreResp: { msg = new ResyncSessionStoreRespMsg(); } break; case NETMSGTYPE_RemoveXAttr: { msg = new RemoveXAttrMsgEx(); } break; case NETMSGTYPE_RemoveXAttrResp: { msg = new RemoveXAttrRespMsg(); } break; case NETMSGTYPE_Rename: { msg = new RenameV2MsgEx(); } break; case NETMSGTYPE_RenameResp: { msg = new RenameRespMsg(); } break; case NETMSGTYPE_RmChunkPathsResp: { msg = new RmChunkPathsRespMsg(); } break; case NETMSGTYPE_RmDirEntry: { msg = new RmDirEntryMsgEx(); } break; case NETMSGTYPE_RmDir: { msg = new RmDirMsgEx(); } break; case NETMSGTYPE_RmDirResp: { msg = new RmDirRespMsg(); } break; case NETMSGTYPE_RmLocalDir: { msg = new RmLocalDirMsgEx(); } break; case NETMSGTYPE_RmLocalDirResp: { msg = new RmLocalDirRespMsg(); } break; case NETMSGTYPE_SetAttr: { msg = new SetAttrMsgEx(); } break; case NETMSGTYPE_SetAttrResp: { msg = new SetAttrRespMsg(); } break; case NETMSGTYPE_SetDirPattern: { msg = new SetDirPatternMsgEx(); } break; case NETMSGTYPE_SetDirPatternResp: { msg = new SetDirPatternRespMsg(); } break; case NETMSGTYPE_SetLocalAttrResp: { msg = new SetLocalAttrRespMsg(); } break; case NETMSGTYPE_SetMetadataMirroring: { msg = new SetMetadataMirroringMsgEx(); } break; case NETMSGTYPE_SetStorageTargetInfoResp: { msg = new SetStorageTargetInfoRespMsg(); } break; case NETMSGTYPE_SetXAttr: { msg = new SetXAttrMsgEx(); } break; case NETMSGTYPE_SetXAttrResp: { msg = new SetXAttrRespMsg(); } break; case NETMSGTYPE_Stat: { msg = new StatMsgEx(); } break; case NETMSGTYPE_StatResp: { msg = new StatRespMsg(); } break; case NETMSGTYPE_StatStoragePath: { msg = new StatStoragePathMsgEx(); } break; case NETMSGTYPE_StatStoragePathResp: { msg = new StatStoragePathRespMsg(); } break; case NETMSGTYPE_TruncFile: { msg = new TruncFileMsgEx(); } break; case NETMSGTYPE_TruncFileResp: { msg = new TruncFileRespMsg(); } break; case NETMSGTYPE_TruncLocalFileResp: { msg = new TruncLocalFileRespMsg(); } break; case NETMSGTYPE_UnlinkFile: { msg = new UnlinkFileMsgEx(); } break; case NETMSGTYPE_UnlinkFileResp: { msg = new UnlinkFileRespMsg(); } break; case NETMSGTYPE_UnlinkLocalFileResp: { msg = new UnlinkLocalFileRespMsg(); } break; case NETMSGTYPE_UpdateBacklinkResp: { msg = new UpdateBacklinkRespMsg(); } break; case NETMSGTYPE_UpdateDirParent: { msg = new UpdateDirParentMsgEx(); } break; case NETMSGTYPE_UpdateDirParentResp: { msg = new UpdateDirParentRespMsg(); } break; // session messages case NETMSGTYPE_BumpFileVersion: { msg = new BumpFileVersionMsgEx(); } break; case NETMSGTYPE_BumpFileVersionResp: { msg = new BumpFileVersionRespMsg(); } break; case NETMSGTYPE_OpenFile: { msg = new OpenFileMsgEx(); } break; case NETMSGTYPE_OpenFileResp: { msg = new OpenFileRespMsg(); } break; case NETMSGTYPE_CloseFile: { msg = new CloseFileMsgEx(); } break; case NETMSGTYPE_CloseFileResp: { msg = new CloseFileRespMsg(); } break; case NETMSGTYPE_CloseChunkFileResp: { msg = new CloseChunkFileRespMsg(); } break; case NETMSGTYPE_WriteLocalFileResp: { msg = new WriteLocalFileRespMsg(); } break; case NETMSGTYPE_FSyncLocalFileResp: { msg = new FSyncLocalFileRespMsg(); } break; case NETMSGTYPE_FLockAppend: { msg = new FLockAppendMsgEx(); } break; case NETMSGTYPE_FLockEntry: { msg = new FLockEntryMsgEx(); } break; case NETMSGTYPE_FLockEntryResp: { msg = new FLockEntryRespMsg(); } break; case NETMSGTYPE_FLockRange: { msg = new FLockRangeMsgEx(); } break; case NETMSGTYPE_FLockRangeResp: { msg = new FLockRangeRespMsg(); } break; case NETMSGTYPE_GetFileVersion: { msg = new GetFileVersionMsgEx(); } break; case NETMSGTYPE_AckNotify: { msg = new AckNotifiyMsgEx(); } break; case NETMSGTYPE_AckNotifyResp: { msg = new AckNotifiyRespMsg(); } break; //admon messages case NETMSGTYPE_RequestMetaData: { msg = new RequestMetaDataMsgEx(); } break; case NETMSGTYPE_GetNodeInfo: { msg = new GetNodeInfoMsgEx(); } break; // fsck messages case NETMSGTYPE_RetrieveDirEntries: { msg = new RetrieveDirEntriesMsgEx(); } break; case NETMSGTYPE_RetrieveInodes: { msg = new RetrieveInodesMsgEx(); } break; case NETMSGTYPE_RetrieveFsIDs: { msg = new RetrieveFsIDsMsgEx(); } break; case NETMSGTYPE_DeleteDirEntries: { msg = new DeleteDirEntriesMsgEx(); } break; case NETMSGTYPE_CreateDefDirInodes: { msg = new CreateDefDirInodesMsgEx(); } break; case NETMSGTYPE_FixInodeOwners: { msg = new FixInodeOwnersMsgEx(); } break; case NETMSGTYPE_FixInodeOwnersInDentry: { msg = new FixInodeOwnersInDentryMsgEx(); } break; case NETMSGTYPE_LinkToLostAndFound: { msg = new LinkToLostAndFoundMsgEx(); } break; case NETMSGTYPE_CreateEmptyContDirs: { msg = new CreateEmptyContDirsMsgEx(); } break; case NETMSGTYPE_UpdateFileAttribs: { msg = new UpdateFileAttribsMsgEx(); } break; case NETMSGTYPE_UpdateDirAttribs: { msg = new UpdateDirAttribsMsgEx(); } break; case NETMSGTYPE_RemoveInodes: { msg = new RemoveInodesMsgEx(); } break; case NETMSGTYPE_RecreateFsIDs: { msg = new RecreateFsIDsMsgEx(); } break; case NETMSGTYPE_RecreateDentries: { msg = new RecreateDentriesMsgEx(); } break; case NETMSGTYPE_FsckSetEventLogging: { msg = new FsckSetEventLoggingMsgEx(); } break; case NETMSGTYPE_AdjustChunkPermissions: { msg = new AdjustChunkPermissionsMsgEx(); } break; default: { msg = new SimpleMsg(NETMSGTYPE_Invalid); } break; } return msg; }
热门排行