different strategy for mutex in protocol class, fix for sending jitter buffer change message when no connection is established

This commit is contained in:
Volker Fischer 2006-03-08 18:44:21 +00:00
parent 736a7b043a
commit a4610ef287
3 changed files with 145 additions and 121 deletions

View file

@ -87,7 +87,14 @@ public:
int GetSockBufSize() { return SockBuf.GetSize(); } int GetSockBufSize() { return SockBuf.GetSize(); }
// network protocol interface // network protocol interface
void CreateJitBufMes ( const int iJitBufSize ) { Protocol.CreateJitBufMes ( iJitBufSize ); } void CreateJitBufMes ( const int iJitBufSize )
{
if ( IsConnected() )
{
Protocol.CreateJitBufMes ( iJitBufSize );
}
}
void CreateReqJitBufMes() { Protocol.CreateReqJitBufMes(); } void CreateReqJitBufMes() { Protocol.CreateReqJitBufMes(); }
protected: protected:

View file

@ -30,7 +30,7 @@ MAIN FRAME
MESSAGES MESSAGES
-------- --------
- Acknowledgement message: PROTMESSID_ACKN - Acknowledgement message: PROTMESSID_ACKN
+-----------------------------------+ +-----------------------------------+
| 2 bytes ID of message to be ackn. | | 2 bytes ID of message to be ackn. |
@ -39,14 +39,15 @@ MESSAGES
note: the cnt value is the same as of the message to be acknowledged note: the cnt value is the same as of the message to be acknowledged
- Jitter buffer size: PROTMESSID_JITT_BUF_SIZE - Jitter buffer size: PROTMESSID_JITT_BUF_SIZE
+--------------------------+ +--------------------------+
| 2 bytes number of blocks | | 2 bytes number of blocks |
+--------------------------+ +--------------------------+
- Request jitter buffer size: PROTMESSID_REQ_JITT_BUF_SIZE - Request jitter buffer size: PROTMESSID_REQ_JITT_BUF_SIZE
no data
note: does not have any data -> n = 0
@ -95,8 +96,12 @@ void CProtocol::EnqueueMessage ( CVector<uint8_t>& vecMessage,
// create send message object for the queue // create send message object for the queue
CSendMessage SendMessageObj ( vecMessage, iCnt, iID ); CSendMessage SendMessageObj ( vecMessage, iCnt, iID );
// we want to have a FIFO: we add at the end and take from the beginning Mutex.lock();
SendMessQueue.push_back ( SendMessageObj ); {
// we want to have a FIFO: we add at the end and take from the beginning
SendMessQueue.push_back ( SendMessageObj );
}
Mutex.unlock();
// if list was empty, initiate send process // if list was empty, initiate send process
if ( bListWasEmpty ) if ( bListWasEmpty )
@ -107,12 +112,27 @@ void CProtocol::EnqueueMessage ( CVector<uint8_t>& vecMessage,
void CProtocol::SendMessage() void CProtocol::SendMessage()
{ {
// we have to check that list is not empty, since in another thread the CVector<uint8_t> vecMessage;
// last element of the list might have been erased bool bSendMess = false;
if ( !SendMessQueue.empty() )
Mutex.lock();
{
// we have to check that list is not empty, since in another thread the
// last element of the list might have been erased
if ( !SendMessQueue.empty() )
{
vecMessage.Init ( SendMessQueue.front().vecMessage.Size() );
vecMessage = SendMessQueue.front().vecMessage;
bSendMess = true;
}
}
Mutex.unlock();
if ( bSendMess )
{ {
// send message // send message
emit MessReadyForSending ( SendMessQueue.front().vecMessage ); emit MessReadyForSending ( vecMessage );
// start time-out timer if not active // start time-out timer if not active
if ( !TimerSendMess.isActive() ) if ( !TimerSendMess.isActive() )
@ -144,27 +164,11 @@ void CProtocol::CreateAndSendAcknMess ( const int& iID, const int& iCnt )
} }
void CProtocol::DeleteSendMessQueue() void CProtocol::DeleteSendMessQueue()
{
/*
Actually, this function must be secured by a mutex, too. The problem is that
this function is called from OnSendProtMessage in client which is called
from a mutexed function of this object. In this case, we will get a dead
lock.
*/
// delete complete "send message queue"
SendMessQueue.clear();
}
/*
The following functions are access functions from different threads. These
functions have to be secured by a mutex to avoid data corruption
*/
void CProtocol::OnTimerSendMess()
{ {
Mutex.lock(); Mutex.lock();
{ {
SendMessage(); // delete complete "send message queue"
SendMessQueue.clear();
} }
Mutex.unlock(); Mutex.unlock();
} }
@ -175,49 +179,49 @@ bool CProtocol::ParseMessage ( const CVector<unsigned char>& vecbyData,
/* /*
return code: true -> ok; false -> error return code: true -> ok; false -> error
*/ */
bool bRet; bool bRet;
int iRecCounter, iRecID, iData;
Mutex.lock(); unsigned int iPos;
{ CVector<uint8_t> vecData;
int iRecCounter, iRecID, iData; bool bSendNextMess;
unsigned int iPos;
CVector<uint8_t> vecData;
// convert unsigned char in uint8_t, TODO convert all buffers in uint8_t // convert unsigned char in uint8_t, TODO convert all buffers in uint8_t
CVector<uint8_t> vecbyDataConv ( iNumBytes ); CVector<uint8_t> vecbyDataConv ( iNumBytes );
for ( int i = 0; i < iNumBytes; i++ ) { for ( int i = 0; i < iNumBytes; i++ ) {
vecbyDataConv[i] = static_cast<uint8_t> ( vecbyData[i] ); vecbyDataConv[i] = static_cast<uint8_t> ( vecbyData[i] );
} }
// important: vecbyDataConv must have iNumBytes to get it work!!! // important: vecbyDataConv must have iNumBytes to get it work!!!
if ( ParseMessageFrame ( vecbyDataConv, iRecCounter, iRecID, vecData ) ) if ( ParseMessageFrame ( vecbyDataConv, iRecCounter, iRecID, vecData ) )
{
// In case we received a message and returned an answer but our answer
// did not make it to the receiver, he will resend his message. We check
// here if the message is the same as the old one, and if this is the
// case, just resend our old answer again
if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) )
{ {
// In case we received a message and returned an answer but our answer // acknowledgments are not acknowledged
// did not make it to the receiver, he will resend his message. We check if ( iRecID != PROTMESSID_ACKN )
// here if the message is the same as the old one, and if this is the
// case, just resend our old answer again
if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) )
{ {
// acknowledgments are not acknowledged // re-send acknowledgement
if ( iRecID != PROTMESSID_ACKN ) CreateAndSendAcknMess ( iRecID, iRecCounter );
{
// re-send acknowledgement
CreateAndSendAcknMess ( iRecID, iRecCounter );
}
} }
else }
else
{
// check which type of message we received and do action
switch ( iRecID )
{ {
// check which type of message we received and do action case PROTMESSID_ACKN:
switch ( iRecID )
// extract data from stream and emit signal for received value
iPos = 0;
iData = static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
Mutex.lock();
{ {
case PROTMESSID_ACKN:
// extract data from stream and emit signal for received value
iPos = 0;
iData = static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
// check if this is the correct acknowledgment // check if this is the correct acknowledgment
if ( ( SendMessQueue.front().iCnt == iRecCounter ) && if ( ( SendMessQueue.front().iCnt == iRecCounter ) &&
( SendMessQueue.front().iID == iData ) ) ( SendMessQueue.front().iID == iData ) )
@ -226,98 +230,109 @@ for ( int i = 0; i < iNumBytes; i++ ) {
SendMessQueue.pop_front(); SendMessQueue.pop_front();
// send next message in queue // send next message in queue
SendMessage(); bSendNextMess = true;
}
else
{
bSendNextMess = false;
} }
break;
case PROTMESSID_JITT_BUF_SIZE:
// extract data from stream and emit signal for received value
iPos = 0;
iData = static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
// invoke message action
emit ChangeJittBufSize ( iData );
// send acknowledge message
CreateAndSendAcknMess ( iRecID, iRecCounter );
break;
case PROTMESSID_REQ_JITT_BUF_SIZE:
// invoke message action
emit ReqJittBufSize();
// send acknowledge message
CreateAndSendAcknMess ( iRecID, iRecCounter );
break;
} }
Mutex.unlock();
if ( bSendNextMess )
{
SendMessage();
}
break;
case PROTMESSID_JITT_BUF_SIZE:
// extract data from stream and emit signal for received value
iPos = 0;
iData = static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
// invoke message action
emit ChangeJittBufSize ( iData );
// send acknowledge message
CreateAndSendAcknMess ( iRecID, iRecCounter );
break;
case PROTMESSID_REQ_JITT_BUF_SIZE:
// invoke message action
emit ReqJittBufSize();
// send acknowledge message
CreateAndSendAcknMess ( iRecID, iRecCounter );
break;
} }
// save current message ID and counter to find out if message was re-sent
iOldRecID = iRecID;
iOldRecCnt = iRecCounter;
bRet = true; // everything was ok
}
else
{
bRet = false; // return error code
} }
// save current message ID and counter to find out if message was re-sent
iOldRecID = iRecID;
iOldRecCnt = iRecCounter;
bRet = true; // everything was ok
}
else
{
bRet = false; // return error code
} }
Mutex.unlock();
return bRet; return bRet;
} }
void CProtocol::CreateJitBufMes ( const int iJitBufSize ) void CProtocol::CreateJitBufMes ( const int iJitBufSize )
{ {
CVector<uint8_t> vecNewMessage;
CVector<uint8_t> vecData ( 2 ); // 2 bytes of data
int iCurCounter;
unsigned int iPos = 0; // init position pointer
Mutex.lock(); Mutex.lock();
{ {
CVector<uint8_t> vecNewMessage;
CVector<uint8_t> vecData ( 2 ); // 2 bytes of data
unsigned int iPos = 0; // init position pointer
// store current counter value // store current counter value
const int iCurCounter = iCounter; iCurCounter = iCounter;
// increase counter (wraps around automatically) // increase counter (wraps around automatically)
iCounter++; iCounter++;
// build data vector
PutValOnStream ( vecData, iPos, static_cast<uint32_t> ( iJitBufSize ), 2 );
// build complete message
GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE, vecData );
// enqueue message
EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE );
} }
Mutex.unlock(); Mutex.unlock();
// build data vector
PutValOnStream ( vecData, iPos, static_cast<uint32_t> ( iJitBufSize ), 2 );
// build complete message
GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE, vecData );
// enqueue message
EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE );
} }
void CProtocol::CreateReqJitBufMes() void CProtocol::CreateReqJitBufMes()
{ {
CVector<uint8_t> vecNewMessage;
int iCurCounter;
Mutex.lock(); Mutex.lock();
{ {
CVector<uint8_t> vecNewMessage;
// store current counter value // store current counter value
const int iCurCounter = iCounter; iCurCounter = iCounter;
// increase counter (wraps around automatically) // increase counter (wraps around automatically)
iCounter++; iCounter++;
// build complete message
GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_REQ_JITT_BUF_SIZE, CVector<uint8_t> ( 0 ) );
// enqueue message
EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_REQ_JITT_BUF_SIZE );
} }
Mutex.unlock(); Mutex.unlock();
// build complete message
GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_REQ_JITT_BUF_SIZE, CVector<uint8_t> ( 0 ) );
// enqueue message
EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_REQ_JITT_BUF_SIZE );
} }

View file

@ -117,15 +117,17 @@ protected:
void SendMessage(); void SendMessage();
uint8_t iCounter;
int iOldRecID, iOldRecCnt; int iOldRecID, iOldRecCnt;
// these two objects must be sequred by a mutex
uint8_t iCounter;
std::list<CSendMessage> SendMessQueue; std::list<CSendMessage> SendMessQueue;
QTimer TimerSendMess; QTimer TimerSendMess;
QMutex Mutex; QMutex Mutex;
public slots: public slots:
void OnTimerSendMess(); void OnTimerSendMess() { SendMessage(); }
signals: signals:
// transmitting // transmitting