From a4610ef2878da7e8bf9601c75e642b0808415c8d Mon Sep 17 00:00:00 2001 From: Volker Fischer Date: Wed, 8 Mar 2006 18:44:21 +0000 Subject: [PATCH] different strategy for mutex in protocol class, fix for sending jitter buffer change message when no connection is established --- src/channel.h | 9 +- src/protocol.cpp | 251 +++++++++++++++++++++++++---------------------- src/protocol.h | 6 +- 3 files changed, 145 insertions(+), 121 deletions(-) diff --git a/src/channel.h b/src/channel.h index 27a0d0d6..8927aa15 100755 --- a/src/channel.h +++ b/src/channel.h @@ -87,7 +87,14 @@ public: int GetSockBufSize() { return SockBuf.GetSize(); } // network protocol interface - void CreateJitBufMes ( const int iJitBufSize ) { Protocol.CreateJitBufMes ( iJitBufSize ); } + void CreateJitBufMes ( const int iJitBufSize ) + { + if ( IsConnected() ) + { + Protocol.CreateJitBufMes ( iJitBufSize ); + } + } + void CreateReqJitBufMes() { Protocol.CreateReqJitBufMes(); } protected: diff --git a/src/protocol.cpp b/src/protocol.cpp index 0c7e506e..d47ceb4c 100755 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -30,7 +30,7 @@ MAIN FRAME MESSAGES -------- -- Acknowledgement message: PROTMESSID_ACKN +- Acknowledgement message: PROTMESSID_ACKN +-----------------------------------+ | 2 bytes ID of message to be ackn. | @@ -39,14 +39,15 @@ MESSAGES note: the cnt value is the same as of the message to be acknowledged -- Jitter buffer size: PROTMESSID_JITT_BUF_SIZE +- Jitter buffer size: PROTMESSID_JITT_BUF_SIZE +--------------------------+ | 2 bytes number of blocks | +--------------------------+ -- Request jitter buffer size: PROTMESSID_REQ_JITT_BUF_SIZE - no data +- Request jitter buffer size: PROTMESSID_REQ_JITT_BUF_SIZE + + note: does not have any data -> n = 0 @@ -95,8 +96,12 @@ void CProtocol::EnqueueMessage ( CVector& vecMessage, // create send message object for the queue CSendMessage SendMessageObj ( vecMessage, iCnt, iID ); - // we want to have a FIFO: we add at the end and take from the beginning - SendMessQueue.push_back ( SendMessageObj ); + Mutex.lock(); + { + // we want to have a FIFO: we add at the end and take from the beginning + SendMessQueue.push_back ( SendMessageObj ); + } + Mutex.unlock(); // if list was empty, initiate send process if ( bListWasEmpty ) @@ -107,12 +112,27 @@ void CProtocol::EnqueueMessage ( CVector& vecMessage, void CProtocol::SendMessage() { - // we have to check that list is not empty, since in another thread the - // last element of the list might have been erased - if ( !SendMessQueue.empty() ) + CVector vecMessage; + bool bSendMess = false; + + Mutex.lock(); + { + // we have to check that list is not empty, since in another thread the + // last element of the list might have been erased + if ( !SendMessQueue.empty() ) + { + vecMessage.Init ( SendMessQueue.front().vecMessage.Size() ); + vecMessage = SendMessQueue.front().vecMessage; + + bSendMess = true; + } + } + Mutex.unlock(); + + if ( bSendMess ) { // send message - emit MessReadyForSending ( SendMessQueue.front().vecMessage ); + emit MessReadyForSending ( vecMessage ); // start time-out timer if not active if ( !TimerSendMess.isActive() ) @@ -144,27 +164,11 @@ void CProtocol::CreateAndSendAcknMess ( const int& iID, const int& iCnt ) } void CProtocol::DeleteSendMessQueue() -{ -/* - Actually, this function must be secured by a mutex, too. The problem is that - this function is called from OnSendProtMessage in client which is called - from a mutexed function of this object. In this case, we will get a dead - lock. -*/ - // delete complete "send message queue" - SendMessQueue.clear(); -} - - -/* - The following functions are access functions from different threads. These - functions have to be secured by a mutex to avoid data corruption -*/ -void CProtocol::OnTimerSendMess() { Mutex.lock(); { - SendMessage(); + // delete complete "send message queue" + SendMessQueue.clear(); } Mutex.unlock(); } @@ -175,49 +179,49 @@ bool CProtocol::ParseMessage ( const CVector& vecbyData, /* return code: true -> ok; false -> error */ - bool bRet; - - Mutex.lock(); - { - int iRecCounter, iRecID, iData; - unsigned int iPos; - CVector vecData; + bool bRet; + int iRecCounter, iRecID, iData; + unsigned int iPos; + CVector vecData; + bool bSendNextMess; // convert unsigned char in uint8_t, TODO convert all buffers in uint8_t CVector vecbyDataConv ( iNumBytes ); for ( int i = 0; i < iNumBytes; i++ ) { - vecbyDataConv[i] = static_cast ( vecbyData[i] ); +vecbyDataConv[i] = static_cast ( vecbyData[i] ); } // important: vecbyDataConv must have iNumBytes to get it work!!! - if ( ParseMessageFrame ( vecbyDataConv, iRecCounter, iRecID, vecData ) ) + if ( ParseMessageFrame ( vecbyDataConv, iRecCounter, iRecID, vecData ) ) + { + // In case we received a message and returned an answer but our answer + // did not make it to the receiver, he will resend his message. We check + // here if the message is the same as the old one, and if this is the + // case, just resend our old answer again + if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) ) { - // In case we received a message and returned an answer but our answer - // did not make it to the receiver, he will resend his message. We check - // here if the message is the same as the old one, and if this is the - // case, just resend our old answer again - if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) ) + // acknowledgments are not acknowledged + if ( iRecID != PROTMESSID_ACKN ) { - // acknowledgments are not acknowledged - if ( iRecID != PROTMESSID_ACKN ) - { - // re-send acknowledgement - CreateAndSendAcknMess ( iRecID, iRecCounter ); - } + // re-send acknowledgement + CreateAndSendAcknMess ( iRecID, iRecCounter ); } - else + } + else + { + // check which type of message we received and do action + switch ( iRecID ) { - // check which type of message we received and do action - switch ( iRecID ) + case PROTMESSID_ACKN: + + // extract data from stream and emit signal for received value + iPos = 0; + iData = static_cast ( GetValFromStream ( vecData, iPos, 2 ) ); + + Mutex.lock(); { - case PROTMESSID_ACKN: - - // extract data from stream and emit signal for received value - iPos = 0; - iData = static_cast ( GetValFromStream ( vecData, iPos, 2 ) ); - // check if this is the correct acknowledgment if ( ( SendMessQueue.front().iCnt == iRecCounter ) && ( SendMessQueue.front().iID == iData ) ) @@ -226,98 +230,109 @@ for ( int i = 0; i < iNumBytes; i++ ) { SendMessQueue.pop_front(); // send next message in queue - SendMessage(); + bSendNextMess = true; + } + else + { + bSendNextMess = false; } - - break; - - case PROTMESSID_JITT_BUF_SIZE: - - // extract data from stream and emit signal for received value - iPos = 0; - iData = static_cast ( GetValFromStream ( vecData, iPos, 2 ) ); - - // invoke message action - emit ChangeJittBufSize ( iData ); - - // send acknowledge message - CreateAndSendAcknMess ( iRecID, iRecCounter ); - - break; - - case PROTMESSID_REQ_JITT_BUF_SIZE: - - // invoke message action - emit ReqJittBufSize(); - - // send acknowledge message - CreateAndSendAcknMess ( iRecID, iRecCounter ); - - break; } + Mutex.unlock(); + + if ( bSendNextMess ) + { + SendMessage(); + } + + break; + + case PROTMESSID_JITT_BUF_SIZE: + + // extract data from stream and emit signal for received value + iPos = 0; + iData = static_cast ( GetValFromStream ( vecData, iPos, 2 ) ); + + // invoke message action + emit ChangeJittBufSize ( iData ); + + // send acknowledge message + CreateAndSendAcknMess ( iRecID, iRecCounter ); + + break; + + case PROTMESSID_REQ_JITT_BUF_SIZE: + + // invoke message action + emit ReqJittBufSize(); + + // send acknowledge message + CreateAndSendAcknMess ( iRecID, iRecCounter ); + + break; } - - // save current message ID and counter to find out if message was re-sent - iOldRecID = iRecID; - iOldRecCnt = iRecCounter; - - bRet = true; // everything was ok - } - else - { - bRet = false; // return error code } + + // save current message ID and counter to find out if message was re-sent + iOldRecID = iRecID; + iOldRecCnt = iRecCounter; + + bRet = true; // everything was ok + } + else + { + bRet = false; // return error code } - Mutex.unlock(); return bRet; } void CProtocol::CreateJitBufMes ( const int iJitBufSize ) { + CVector vecNewMessage; + CVector vecData ( 2 ); // 2 bytes of data + int iCurCounter; + unsigned int iPos = 0; // init position pointer + Mutex.lock(); { - CVector vecNewMessage; - CVector vecData ( 2 ); // 2 bytes of data - unsigned int iPos = 0; // init position pointer - // store current counter value - const int iCurCounter = iCounter; + iCurCounter = iCounter; // increase counter (wraps around automatically) iCounter++; - - // build data vector - PutValOnStream ( vecData, iPos, static_cast ( iJitBufSize ), 2 ); - - // build complete message - GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE, vecData ); - - // enqueue message - EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE ); } Mutex.unlock(); + + // build data vector + PutValOnStream ( vecData, iPos, static_cast ( iJitBufSize ), 2 ); + + // build complete message + GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE, vecData ); + + // enqueue message + EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE ); } void CProtocol::CreateReqJitBufMes() { + CVector vecNewMessage; + int iCurCounter; + Mutex.lock(); { - CVector vecNewMessage; - // store current counter value - const int iCurCounter = iCounter; + iCurCounter = iCounter; // increase counter (wraps around automatically) iCounter++; - - // build complete message - GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_REQ_JITT_BUF_SIZE, CVector ( 0 ) ); - - // enqueue message - EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_REQ_JITT_BUF_SIZE ); } Mutex.unlock(); + + // build complete message + GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_REQ_JITT_BUF_SIZE, CVector ( 0 ) ); + + // enqueue message + EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_REQ_JITT_BUF_SIZE ); } diff --git a/src/protocol.h b/src/protocol.h index 0ec2333d..b5cfbe61 100755 --- a/src/protocol.h +++ b/src/protocol.h @@ -117,15 +117,17 @@ protected: void SendMessage(); - uint8_t iCounter; int iOldRecID, iOldRecCnt; + + // these two objects must be sequred by a mutex + uint8_t iCounter; std::list SendMessQueue; QTimer TimerSendMess; QMutex Mutex; public slots: - void OnTimerSendMess(); + void OnTimerSendMess() { SendMessage(); } signals: // transmitting