From a9183633bca362d204889664165dc1d8fc9c7fd9 Mon Sep 17 00:00:00 2001 From: Volker Fischer Date: Sat, 4 Mar 2006 10:24:40 +0000 Subject: [PATCH] some more protocol implementations --- src/channel.cpp | 25 +++++++++++- src/channel.h | 23 ++++++----- src/protocol.cpp | 102 ++++++++++++++++++++++++++++++++++++++++++----- src/protocol.h | 53 ++++++++++++++++++++---- 4 files changed, 171 insertions(+), 32 deletions(-) diff --git a/src/channel.cpp b/src/channel.cpp index 5f9e0dd3..efad2bde 100755 --- a/src/channel.cpp +++ b/src/channel.cpp @@ -215,14 +215,28 @@ CChannel::CChannel () /* connections ---------------------------------------------------------- */ - // just route message through this class QObject::connect ( &Protocol, SIGNAL ( MessReadyForSending ( CVector ) ), - SIGNAL ( MessReadyForSending ( CVector ) ) ); + this, SLOT ( OnSendProtMessage ( CVector ) ) ); QObject::connect ( &Protocol, SIGNAL ( ChangeJittBufSize ( int ) ), this, SLOT ( OnJittBufSizeChange ( int ) ) ); } + +void CChannel::OnSendProtMessage ( CVector vecMessage ) +{ + // only send messages if we are connected, otherwise delete complete queue +// if ( IsConnected () ) +// { + // emit message to actually send the data + emit MessReadyForSending ( vecMessage ); +// } +// else +// { +// // delete send message queue +// Protocol.DeleteSendMessQueue(); +// } +} void CChannel::SetSockBufSize ( const int iNewBlockSize, const int iNumBlocks ) { @@ -342,6 +356,13 @@ CVector CChannel::PrepSendPacket(const CVector& vecsNPacke return vecbySendBuf; } + + + + + + + /******************************************************************************\ * CSampleOffsetEst * diff --git a/src/channel.h b/src/channel.h index fa65205c..c0001f81 100755 --- a/src/channel.h +++ b/src/channel.h @@ -108,32 +108,33 @@ protected: int iAudComprSize; /* resampling */ - CResample ResampleObj; - double dSamRateOffset; - CVector vecdResInData; - CVector vecdResOutData; + CResample ResampleObj; + double dSamRateOffset; + CVector vecdResInData; + CVector vecdResOutData; /* connection parameters */ - CHostAddress InetAddr; + CHostAddress InetAddr; /* network jitter-buffer */ - CNetBuf SockBuf; + CNetBuf SockBuf; /* network output conversion buffer */ - CConvBuf ConvBuf; + CConvBuf ConvBuf; // network protocol - CProtocol Protocol; + CProtocol Protocol; /* time stamp index counter */ - Q_UINT8 byTimeStampIdxCnt; - int iTimeStampActCnt; + Q_UINT8 byTimeStampIdxCnt; + int iTimeStampActCnt; - int iConTimeOut; + int iConTimeOut; QMutex Mutex; public slots: + void OnSendProtMessage ( CVector vecMessage ); void OnJittBufSizeChange ( int iNewJitBufSize ); signals: diff --git a/src/protocol.cpp b/src/protocol.cpp index 3375a305..b5037800 100755 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -74,19 +74,69 @@ MESSAGES /* Implementation *************************************************************/ -void CProtocol::EnqueueMessage ( CVector& vecMessage ) -{ - /* TODO */ - -emit MessReadyForSending ( vecMessage ); - -} // TODO take care of mutexing ressources!!!!!! +CProtocol::CProtocol() : iCounter ( 0 ), iOldRecID ( PROTMESSID_ILLEGAL ), + iOldRecCnt ( 0 ) +{ + SendMessQueue.clear(); + + // connections + QObject::connect ( &TimerSendMess, SIGNAL ( timeout() ), + this, SLOT ( OnTimerSendMess() ) ); +} + +void CProtocol::EnqueueMessage ( CVector& vecMessage, + const int iCnt, + const int iID ) +{ + // check if list is empty so that we have to initiate a send process + const bool bListWasEmpty = SendMessQueue.empty(); + + // create send message object for the queue + CSendMessage SendMessageObj ( vecMessage, iCnt, iID ); + + // we want to have a FIFO: we add at the end and take from the beginning + SendMessQueue.push_back ( SendMessageObj ); + + // if list was empty, initiate send process + if ( bListWasEmpty ) + { + SendMessage(); + } +} + +void CProtocol::SendMessage() +{ + // we have to check that list is not empty, since in another thread the + // last element of the list might have been erased + if ( !SendMessQueue.empty() ) + { + // send message + emit MessReadyForSending ( SendMessQueue.front().vecMessage ); + + // start time-out timer if not active + if ( !TimerSendMess.isActive() ) + { + TimerSendMess.start ( SEND_MESS_TIMEOUT_MS ); + } + } + else + { + // no message to send, stop timer + TimerSendMess.stop(); + } +} + +void CProtocol::DeleteSendMessQueue() +{ + // delete complete "send message queue" + SendMessQueue.clear(); +} bool CProtocol::ParseMessage ( const CVector& vecbyData, const int iNumBytes ) @@ -131,10 +181,20 @@ for ( int i = 0; i < iNumBytes; i++ ) { { case PROTMESSID_ACKN: -// TODO - + // extract data from stream and emit signal for received value + iPos = 0; + iData = static_cast ( GetValFromStream ( vecData, iPos, 2 ) ); + // check if this is the correct acknowledgment + if ( ( SendMessQueue.front().iCnt == iRecCounter ) && + ( SendMessQueue.front().iID == iData ) ) + { + // message acknowledged, remove from queue + SendMessQueue.pop_front(); + // send next message in queue + SendMessage(); + } break; @@ -144,8 +204,10 @@ for ( int i = 0; i < iNumBytes; i++ ) { iPos = 0; iData = static_cast ( GetValFromStream ( vecData, iPos, 2 ) ); - // invoke message action and send acknowledge message + // invoke message action emit ChangeJittBufSize ( iData ); + + // send acknowledge message CreateAndSendAcknMess ( iRecID, iRecCounter ); break; @@ -197,10 +259,28 @@ void CProtocol::CreateJitBufMes ( const int iJitBufSize ) iCounter++; // enqueue message - EnqueueMessage ( vecNewMessage ); + EnqueueMessage ( vecNewMessage, iCounter, PROTMESSID_JITT_BUF_SIZE ); } + + + + + + + + + + + + + + + + + + /******************************************************************************\ * Message generation (parsing) * \******************************************************************************/ diff --git a/src/protocol.h b/src/protocol.h index 0e3eb2a1..d8991691 100755 --- a/src/protocol.h +++ b/src/protocol.h @@ -27,6 +27,8 @@ #include #include +#include +#include #include "global.h" #include "util.h" @@ -42,6 +44,9 @@ #define MESS_HEADER_LENGTH_BYTE 5 /* ID, cnt, length */ #define MESS_LEN_WITHOUT_DATA_BYTE ( MESS_HEADER_LENGTH_BYTE + 2 /* CRC */ ) +// time out for message re-send if no acknowledgement was received +#define SEND_MESS_TIMEOUT_MS 400 // ms + /* Classes ********************************************************************/ class CProtocol : public QObject @@ -49,9 +54,8 @@ class CProtocol : public QObject Q_OBJECT public: - CProtocol () : iCounter ( 0 ), - iOldRecID ( PROTMESSID_ILLEGAL ), iOldRecCnt ( 0 ) {} - virtual ~CProtocol () {} + CProtocol(); + virtual ~CProtocol() {} void CreateJitBufMes ( const int iJitBufSize ); @@ -60,9 +64,35 @@ public: bool ParseMessage ( const CVector& vecbyData, const int iNumBytes ); -protected: - void EnqueueMessage ( CVector& vecMessage ); + void DeleteSendMessQueue (); +protected: + class CSendMessage + { + public: + CSendMessage() : vecMessage ( 0 ), iID ( PROTMESSID_ILLEGAL ), + iCnt ( 0 ) {} + CSendMessage ( const CVector& nMess, const int iNCnt, + const int iNID ) : vecMessage ( nMess ), iID ( iNID ), + iCnt ( iNCnt ) {} + + CSendMessage& operator= ( const CSendMessage& NewSendMess ) + { + vecMessage.Init ( NewSendMess.vecMessage.Size() ); + vecMessage = NewSendMess.vecMessage; + + iID = NewSendMess.iID; + iCnt = NewSendMess.iCnt; + return *this; + } + + CVector vecMessage; + int iID, iCnt; + }; + + void EnqueueMessage ( CVector& vecMessage, + const int iCnt, + const int iID ); bool ParseMessageFrame ( const CVector& vecIn, int& iCnt, @@ -83,9 +113,16 @@ protected: unsigned int& iPos, const unsigned int iNumOfBytes ); - CVector vecMessage; - uint8_t iCounter; - int iOldRecID, iOldRecCnt; + void SendMessage(); + + uint8_t iCounter; + int iOldRecID, iOldRecCnt; + std::list SendMessQueue; + QTimer TimerSendMess; + QMutex Mutex; + +public slots: + void OnTimerSendMess() { SendMessage(); } signals: // transmitting