added mutex for protocol

This commit is contained in:
Volker Fischer 2006-03-06 17:04:07 +00:00
parent 1595a7446b
commit fc816a7f09
8 changed files with 318 additions and 379 deletions

View file

@ -112,44 +112,50 @@ bool CChannelSet::PutData ( const CVector<unsigned char>& vecbyRecBuf,
const int iNumBytesRead, const int iNumBytesRead,
const CHostAddress& HostAdr ) const CHostAddress& HostAdr )
{ {
Mutex.lock ();
/* get channel ID ------------------------------------------------------- */
bool bChanOK = true; bool bChanOK = true;
/* check address */ Mutex.lock ();
int iCurChanID = CheckAddr ( HostAdr );
if ( iCurChanID == INVALID_CHANNEL_ID )
{ {
/* a new client is calling, look for free channel */ /* get channel ID --------------------------------------------------- */
iCurChanID = GetFreeChan (); /* check address */
int iCurChanID = CheckAddr ( HostAdr );
if ( iCurChanID != INVALID_CHANNEL_ID ) if ( iCurChanID == INVALID_CHANNEL_ID )
{ {
vecChannels[iCurChanID].SetAddress ( HostAdr ); /* a new client is calling, look for free channel */
iCurChanID = GetFreeChan ();
if ( iCurChanID != INVALID_CHANNEL_ID )
{
vecChannels[iCurChanID].SetAddress ( HostAdr );
}
else
{
bChanOK = false; /* no free channel available */
}
} }
else
/* put received data in jitter buffer ------------------------------- */
if ( bChanOK )
{ {
bChanOK = false; /* no free channel available */ /* put packet in socket buffer */
switch ( vecChannels[iCurChanID].PutData ( vecbyRecBuf, iNumBytesRead ) )
{
case PS_AUDIO_OK:
PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_GREEN, iCurChanID );
break;
case PS_AUDIO_ERR:
PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_RED, iCurChanID );
break;
case PS_PROT_ERR:
PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_YELLOW, iCurChanID );
break;
}
} }
} }
/* put received data in jitter buffer ----------------------------------- */
if ( bChanOK )
{
/* put packet in socket buffer */
if ( vecChannels[iCurChanID].PutData ( vecbyRecBuf, iNumBytesRead ) )
{
PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_GREEN, iCurChanID );
}
else
{
PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_RED, iCurChanID );
}
}
Mutex.unlock (); Mutex.unlock ();
return !bChanOK; /* return 1 if error */ return !bChanOK; /* return 1 if error */
@ -167,36 +173,36 @@ void CChannelSet::GetBlockAllConC ( CVector<int>& vecChanID,
/* make put and get calls thread safe. Do not forget to unlock mutex /* make put and get calls thread safe. Do not forget to unlock mutex
afterwards! */ afterwards! */
Mutex.lock (); Mutex.lock ();
/* Check all possible channels */
for ( int i = 0; i < MAX_NUM_CHANNELS; i++ )
{ {
/* read out all input buffers to decrease timeout counter on /* Check all possible channels */
disconnected channels */ for ( int i = 0; i < MAX_NUM_CHANNELS; i++ )
const bool bGetOK = vecChannels[i].GetData ( vecdData );
if ( vecChannels[i].IsConnected () )
{ {
/* add ID and data */ /* read out all input buffers to decrease timeout counter on
vecChanID.Add ( i ); disconnected channels */
const bool bGetOK = vecChannels[i].GetData ( vecdData );
const int iOldSize = vecvecdData.Size (); if ( vecChannels[i].IsConnected () )
vecvecdData.Enlarge ( 1 ); {
vecvecdData[iOldSize].Init ( vecdData.Size () ); /* add ID and data */
vecvecdData[iOldSize] = vecdData; vecChanID.Add ( i );
/* send message for get status (for GUI) */ const int iOldSize = vecvecdData.Size ();
if ( bGetOK ) vecvecdData.Enlarge ( 1 );
{ vecvecdData[iOldSize].Init ( vecdData.Size () );
PostWinMessage ( MS_JIT_BUF_GET, MUL_COL_LED_GREEN, i ); vecvecdData[iOldSize] = vecdData;
}
else /* send message for get status (for GUI) */
{ if ( bGetOK )
PostWinMessage ( MS_JIT_BUF_GET, MUL_COL_LED_RED, i ); {
PostWinMessage ( MS_JIT_BUF_GET, MUL_COL_LED_GREEN, i );
}
else
{
PostWinMessage ( MS_JIT_BUF_GET, MUL_COL_LED_RED, i );
}
} }
} }
} }
Mutex.unlock (); /* release mutex */ Mutex.unlock (); /* release mutex */
} }
@ -221,20 +227,6 @@ void CChannelSet::GetConCliParam ( CVector<CHostAddress>& vecHostAddresses,
} }
} }
void CChannelSet::SetSockBufSize ( const int iNewBlockSize, const int iNumBlocks )
{
/* this opperation must be done with mutex */
Mutex.lock ();
/* as a test we adjust the buffers of all channels to the new value. Maybe later
do change only for some channels -> take care to set value back to default if
channel is disconnected, afterwards! */
for ( int i = 0; i < MAX_NUM_CHANNELS; i++ )
vecChannels[i].SetSockBufSize ( iNewBlockSize, iNumBlocks );
Mutex.unlock ();
}
/******************************************************************************\ /******************************************************************************\
* CChannel * * CChannel *
@ -286,9 +278,9 @@ void CChannel::SetSockBufSize ( const int iNewBlockSize, const int iNumBlocks )
{ {
/* this opperation must be done with mutex */ /* this opperation must be done with mutex */
Mutex.lock (); Mutex.lock ();
{
SockBuf.Init ( iNewBlockSize, iNumBlocks ); SockBuf.Init ( iNewBlockSize, iNumBlocks );
}
Mutex.unlock (); Mutex.unlock ();
} }
@ -314,10 +306,10 @@ bool CChannel::GetAddress(CHostAddress& RetAddr)
} }
} }
bool CChannel::PutData ( const CVector<unsigned char>& vecbyData, EPutDataStat CChannel::PutData ( const CVector<unsigned char>& vecbyData,
int iNumBytes ) int iNumBytes )
{ {
bool bRet = true; EPutDataStat eRet = PS_GEN_ERROR;
/* only process if packet has correct size */ /* only process if packet has correct size */
if ( iNumBytes == iAudComprSize ) if ( iNumBytes == iAudComprSize )
@ -342,41 +334,61 @@ for (int i = 0; i < BLOCK_SIZE_SAMPLES; i++)
Mutex.lock (); /* put mutex lock */ Mutex.lock (); /* put mutex lock */
{
bRet = SockBuf.Put ( vecdResOutData ); if ( SockBuf.Put ( vecdResOutData ) )
{
eRet = PS_AUDIO_OK;
}
else
{
eRet = PS_AUDIO_ERR;
}
}
Mutex.unlock (); /* put mutex unlock */ Mutex.unlock (); /* put mutex unlock */
/* reset time-out counter */ // if channel was not connected, emit signal to inform that new connection
// was established
if ( iConTimeOut == 0 )
{
emit NewConnection();
}
// reset time-out counter
iConTimeOut = CON_TIME_OUT_CNT_MAX; iConTimeOut = CON_TIME_OUT_CNT_MAX;
} }
else else
{ {
// this seems not to be an audio block, parse the message // this seems not to be an audio block, parse the message
if ( Protocol.ParseMessage ( vecbyData, iNumBytes ) )
// TODO: different return code for protocol {
eRet = PS_PROT_OK;
bRet = Protocol.ParseMessage ( vecbyData, iNumBytes ); }
else
{
eRet = PS_PROT_ERR;
}
} }
return bRet; return eRet;
} }
bool CChannel::GetData ( CVector<double>& vecdData ) bool CChannel::GetData ( CVector<double>& vecdData )
{ {
bool bGetOK = false;
Mutex.lock (); /* get mutex lock */ Mutex.lock (); /* get mutex lock */
const bool bGetOK = SockBuf.Get ( vecdData );
if ( !bGetOK )
{ {
/* decrease time-out counter */ bGetOK = SockBuf.Get ( vecdData );
if ( iConTimeOut > 0 )
if ( !bGetOK )
{ {
iConTimeOut--; /* decrease time-out counter */
if ( iConTimeOut > 0 )
{
iConTimeOut--;
}
} }
} }
Mutex.unlock (); /* get mutex unlock */ Mutex.unlock (); /* get mutex unlock */
return bGetOK; return bGetOK;

View file

@ -48,53 +48,42 @@
/* no valid channel number */ /* no valid channel number */
#define INVALID_CHANNEL_ID (MAX_NUM_CHANNELS + 1) #define INVALID_CHANNEL_ID (MAX_NUM_CHANNELS + 1)
enum EPutDataStat
/* Classes ********************************************************************/
class CSampleOffsetEst
{ {
public: PS_GEN_ERROR,
CSampleOffsetEst() {Init();} PS_AUDIO_OK,
virtual ~CSampleOffsetEst() {} PS_AUDIO_ERR,
PS_PROT_OK,
void Init(); PS_PROT_ERR
void AddTimeStampIdx(const int iTimeStampIdx);
double GetSamRate() {return dSamRateEst;}
protected:
QTime RefTime;
int iAccTiStVal;
double dSamRateEst;
CVector<long int> veciTimeElapsed;
CVector<long int> veciTiStIdx;
int iInitCnt;
}; };
/* Classes ********************************************************************/
/* CChannel ----------------------------------------------------------------- */ /* CChannel ----------------------------------------------------------------- */
class CChannel : public QObject class CChannel : public QObject
{ {
Q_OBJECT Q_OBJECT
public: public:
CChannel (); CChannel();
virtual ~CChannel () {} virtual ~CChannel() {}
bool PutData ( const CVector<unsigned char>& vecbyData, EPutDataStat PutData ( const CVector<unsigned char>& vecbyData,
int iNumBytes ); int iNumBytes );
bool GetData ( CVector<double>& vecdData ); bool GetData ( CVector<double>& vecdData );
CVector<unsigned char> PrepSendPacket ( const CVector<short>& vecsNPacket ); CVector<unsigned char> PrepSendPacket ( const CVector<short>& vecsNPacket );
bool IsConnected () const { return iConTimeOut > 0; } bool IsConnected() const { return iConTimeOut > 0; }
int GetComprAudSize () { return iAudComprSize; } int GetComprAudSize() { return iAudComprSize; }
void SetAddress ( const CHostAddress NAddr ) { InetAddr = NAddr; } void SetAddress ( const CHostAddress NAddr ) { InetAddr = NAddr; }
bool GetAddress ( CHostAddress& RetAddr ); bool GetAddress ( CHostAddress& RetAddr );
CHostAddress GetAddress () { return InetAddr; } CHostAddress GetAddress () { return InetAddr; }
void SetSockBufSize ( const int iNewBlockSize, const int iNumBlocks ); void SetSockBufSize ( const int iNewBlockSize, const int iNumBlocks );
int GetSockBufSize () { return SockBuf.GetSize(); } int GetSockBufSize() { return SockBuf.GetSize(); }
// network protocol interface // network protocol interface
void CreateJitBufMes ( const int iJitBufSize ) void CreateJitBufMes ( const int iJitBufSize )
@ -139,6 +128,7 @@ public slots:
signals: signals:
void MessReadyForSending ( CVector<uint8_t> vecMessage ); void MessReadyForSending ( CVector<uint8_t> vecMessage );
void NewConnection();
}; };
@ -151,28 +141,29 @@ public:
CChannelSet(); CChannelSet();
virtual ~CChannelSet() {} virtual ~CChannelSet() {}
bool PutData(const CVector<unsigned char>& vecbyRecBuf, bool PutData ( const CVector<unsigned char>& vecbyRecBuf,
const int iNumBytesRead, const CHostAddress& HostAdr); const int iNumBytesRead, const CHostAddress& HostAdr );
int GetFreeChan(); int GetFreeChan();
int CheckAddr(const CHostAddress& Addr);
void GetBlockAllConC(CVector<int>& vecChanID, int CheckAddr ( const CHostAddress& Addr );
CVector<CVector<double> >& vecvecdData);
void GetConCliParam(CVector<CHostAddress>& vecHostAddresses, void GetBlockAllConC ( CVector<int>& vecChanID,
CVector<int>& veciJitBufSize); CVector<CVector<double> >& vecvecdData );
void GetConCliParam( CVector<CHostAddress>& vecHostAddresses,
CVector<int>& veciJitBufSize );
/* access functions for actual channels */ /* access functions for actual channels */
bool IsConnected(const int iChanNum) bool IsConnected ( const int iChanNum )
{return vecChannels[iChanNum].IsConnected();} { return vecChannels[iChanNum].IsConnected(); }
CVector<unsigned char> PrepSendPacket(const int iChanNum,
const CVector<short>& vecsNPacket)
{return vecChannels[iChanNum].PrepSendPacket(vecsNPacket);}
CHostAddress GetAddress(const int iChanNum)
{return vecChannels[iChanNum].GetAddress();}
void SetSockBufSize ( const int iNewBlockSize, const int iNumBlocks); CVector<unsigned char> PrepSendPacket ( const int iChanNum,
int GetSockBufSize() {return vecChannels[0].GetSockBufSize();} const CVector<short>& vecsNPacket )
{ return vecChannels[iChanNum].PrepSendPacket ( vecsNPacket ); }
CHostAddress GetAddress ( const int iChanNum )
{ return vecChannels[iChanNum].GetAddress(); }
protected: protected:
/* do not use the vector class since CChannel does not have appropriate /* do not use the vector class since CChannel does not have appropriate
@ -202,4 +193,25 @@ signals:
}; };
/* Sample rate offset estimation -------------------------------------------- */
class CSampleOffsetEst
{
public:
CSampleOffsetEst() { Init(); }
virtual ~CSampleOffsetEst() {}
void Init();
void AddTimeStampIdx ( const int iTimeStampIdx );
double GetSamRate() { return dSamRateEst; }
protected:
QTime RefTime;
int iAccTiStVal;
double dSamRateEst;
CVector<long int> veciTimeElapsed;
CVector<long int> veciTiStIdx;
int iInitCnt;
};
#endif /* !defined(CHANNEL_HOIH9345KJH98_3_4344_BB23945IUHF1912__INCLUDED_) */ #endif /* !defined(CHANNEL_HOIH9345KJH98_3_4344_BB23945IUHF1912__INCLUDED_) */

View file

@ -69,12 +69,6 @@ CLlconServerDlg::CLlconServerDlg ( CServer* pNServP, QWidget* parent,
#endif #endif
} }
/* Init slider control */
SliderNetBuf->setRange(1, MAX_NET_BUF_SIZE_NUM_BL);
const int iCurNumNetBuf = pServer->GetChannelSet()->GetSockBufSize();
SliderNetBuf->setValue(iCurNumNetBuf);
TextNetBuf->setText("Size: " + QString().setNum(iCurNumNetBuf));
/* Init timing jitter text label */ /* Init timing jitter text label */
TextLabelResponseTime->setText(""); TextLabelResponseTime->setText("");
@ -93,10 +87,6 @@ CLlconServerDlg::CLlconServerDlg ( CServer* pNServP, QWidget* parent,
/* timers */ /* timers */
QObject::connect(&Timer, SIGNAL(timeout()), this, SLOT(OnTimer())); QObject::connect(&Timer, SIGNAL(timeout()), this, SLOT(OnTimer()));
/* sliders */
QObject::connect(SliderNetBuf, SIGNAL(valueChanged(int)),
this, SLOT(OnSliderNetBuf(int)));
/* timers --------------------------------------------------------------- */ /* timers --------------------------------------------------------------- */
/* start timer for GUI controls */ /* start timer for GUI controls */
@ -158,12 +148,6 @@ void CLlconServerDlg::OnTimer()
} }
} }
void CLlconServerDlg::OnSliderNetBuf(int value)
{
pServer->GetChannelSet()->SetSockBufSize( MIN_BLOCK_SIZE_SAMPLES, value );
TextNetBuf->setText("Size: " + QString().setNum(value));
}
void CLlconServerDlg::customEvent(QCustomEvent* Event) void CLlconServerDlg::customEvent(QCustomEvent* Event)
{ {
if (Event->type() == QEvent::User + 11) if (Event->type() == QEvent::User + 11)

View file

@ -74,5 +74,4 @@ protected:
public slots: public slots:
void OnTimer(); void OnTimer();
void OnSliderNetBuf(int value);
}; };

View file

@ -11,7 +11,7 @@
<rect> <rect>
<x>0</x> <x>0</x>
<y>0</y> <y>0</y>
<width>633</width> <width>629</width>
<height>240</height> <height>240</height>
</rect> </rect>
</property> </property>
@ -37,110 +37,35 @@
<number>6</number> <number>6</number>
</property> </property>
<widget> <widget>
<class>QLayoutWidget</class> <class>QListView</class>
<column>
<property>
<name>text</name>
<string>Column 1</string>
</property>
<property>
<name>clickable</name>
<bool>true</bool>
</property>
<property>
<name>resizeable</name>
<bool>true</bool>
</property>
</column>
<item>
<property>
<name>text</name>
<string>New Item</string>
</property>
<property>
<name>pixmap</name>
<pixmap></pixmap>
</property>
</item>
<property stdset="1"> <property stdset="1">
<name>name</name> <name>name</name>
<cstring>Layout4</cstring> <cstring>ListViewClients</cstring>
</property> </property>
<hbox>
<property stdset="1">
<name>margin</name>
<number>0</number>
</property>
<property stdset="1">
<name>spacing</name>
<number>6</number>
</property>
<widget>
<class>QListView</class>
<column>
<property>
<name>text</name>
<string>Column 1</string>
</property>
<property>
<name>clickable</name>
<bool>true</bool>
</property>
<property>
<name>resizeable</name>
<bool>true</bool>
</property>
</column>
<item>
<property>
<name>text</name>
<string>New Item</string>
</property>
<property>
<name>pixmap</name>
<pixmap></pixmap>
</property>
</item>
<property stdset="1">
<name>name</name>
<cstring>ListViewClients</cstring>
</property>
</widget>
<widget>
<class>QGroupBox</class>
<property stdset="1">
<name>name</name>
<cstring>GroupBox3</cstring>
</property>
<property stdset="1">
<name>title</name>
<string>Jitter Buffer</string>
</property>
<vbox>
<property stdset="1">
<name>margin</name>
<number>11</number>
</property>
<property stdset="1">
<name>spacing</name>
<number>6</number>
</property>
<widget>
<class>QLabel</class>
<property stdset="1">
<name>name</name>
<cstring>TextNetBuf</cstring>
</property>
<property stdset="1">
<name>text</name>
<string>Size</string>
</property>
<property stdset="1">
<name>alignment</name>
<set>AlignCenter</set>
</property>
<property>
<name>hAlign</name>
</property>
</widget>
<widget>
<class>QSlider</class>
<property stdset="1">
<name>name</name>
<cstring>SliderNetBuf</cstring>
</property>
<property stdset="1">
<name>pageStep</name>
<number>1</number>
</property>
<property stdset="1">
<name>orientation</name>
<enum>Vertical</enum>
</property>
<property stdset="1">
<name>tickmarks</name>
<enum>Both</enum>
</property>
</widget>
</vbox>
</widget>
</hbox>
</widget> </widget>
<widget> <widget>
<class>QLayoutWidget</class> <class>QLayoutWidget</class>

View file

@ -72,14 +72,6 @@ MESSAGES
/* Implementation *************************************************************/ /* Implementation *************************************************************/
// TODO take care of mutexing ressources!!!!!!
CProtocol::CProtocol() : iCounter ( 0 ), iOldRecID ( PROTMESSID_ILLEGAL ), CProtocol::CProtocol() : iCounter ( 0 ), iOldRecID ( PROTMESSID_ILLEGAL ),
iOldRecCnt ( 0 ) iOldRecCnt ( 0 )
{ {
@ -132,98 +124,6 @@ void CProtocol::SendMessage()
} }
} }
void CProtocol::DeleteSendMessQueue()
{
// delete complete "send message queue"
SendMessQueue.clear();
}
bool CProtocol::ParseMessage ( const CVector<unsigned char>& vecbyData,
const int iNumBytes )
{
/*
return code: true -> ok; false -> error
*/
int iRecCounter, iRecID, iData;
unsigned int iPos;
CVector<uint8_t> vecData;
// convert unsigned char in uint8_t, TODO convert all buffers in uint8_t
CVector<uint8_t> vecbyDataConv ( iNumBytes );
for ( int i = 0; i < iNumBytes; i++ ) {
vecbyDataConv[i] = static_cast<uint8_t> ( vecbyData[i] );
}
// important: vecbyDataConv must have iNumBytes to get it work!!!
if ( ParseMessageFrame ( vecbyDataConv, iRecCounter, iRecID, vecData ) )
{
// In case we received a message and returned an answer but our answer
// did not make it to the receiver, he will resend his message. We check
// here if the message is the same as the old one, and if this is the
// case, just resend our old answer again
if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) )
{
// acknowledgments are not acknowledged
if ( iRecID != PROTMESSID_ACKN )
{
// re-send acknowledgement
CreateAndSendAcknMess ( iRecID, iRecCounter );
}
}
else
{
// check which type of message we received and do action
switch ( iRecID )
{
case PROTMESSID_ACKN:
// extract data from stream and emit signal for received value
iPos = 0;
iData = static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
// check if this is the correct acknowledgment
if ( ( SendMessQueue.front().iCnt == iRecCounter ) &&
( SendMessQueue.front().iID == iData ) )
{
// message acknowledged, remove from queue
SendMessQueue.pop_front();
// send next message in queue
SendMessage();
}
break;
case PROTMESSID_JITT_BUF_SIZE:
// extract data from stream and emit signal for received value
iPos = 0;
iData = static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
// invoke message action
emit ChangeJittBufSize ( iData );
// send acknowledge message
CreateAndSendAcknMess ( iRecID, iRecCounter );
break;
}
}
// save current message ID and counter to find out if message was re-sent
iOldRecID = iRecID;
iOldRecCnt = iRecCounter;
return true; // everything was ok
}
else
{
return false; // return error code
}
}
void CProtocol::CreateAndSendAcknMess ( const int& iID, const int& iCnt ) void CProtocol::CreateAndSendAcknMess ( const int& iID, const int& iCnt )
{ {
CVector<uint8_t> vecAcknMessage; CVector<uint8_t> vecAcknMessage;
@ -240,46 +140,145 @@ void CProtocol::CreateAndSendAcknMess ( const int& iID, const int& iCnt )
emit MessReadyForSending ( vecAcknMessage ); emit MessReadyForSending ( vecAcknMessage );
} }
void CProtocol::CreateJitBufMes ( const int iJitBufSize )
/*
The following functions are access functions from different threads. These
functions have to be secured by a mutex to avoid data corruption
*/
void CProtocol::DeleteSendMessQueue()
{ {
CVector<uint8_t> vecNewMessage; Mutex.lock();
CVector<uint8_t> vecData ( 2 ); // 2 bytes of data {
unsigned int iPos = 0; // init position pointer // delete complete "send message queue"
SendMessQueue.clear();
}
Mutex.unlock();
}
// store current counter value void CProtocol::OnTimerSendMess()
const int iCurCounter = iCounter; {
Mutex.lock();
{
SendMessage();
}
Mutex.unlock();
}
// increase counter (wraps around automatically) bool CProtocol::ParseMessage ( const CVector<unsigned char>& vecbyData,
// TODO: make it thread safe!!!!!!!!!!!! const int iNumBytes )
iCounter++; {
/*
return code: true -> ok; false -> error
*/
Mutex.lock();
{
int iRecCounter, iRecID, iData;
unsigned int iPos;
CVector<uint8_t> vecData;
// build data vector
PutValOnStream ( vecData, iPos, static_cast<uint32_t> ( iJitBufSize ), 2 );
// build complete message // convert unsigned char in uint8_t, TODO convert all buffers in uint8_t
GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE, vecData ); CVector<uint8_t> vecbyDataConv ( iNumBytes );
for ( int i = 0; i < iNumBytes; i++ ) {
// enqueue message vecbyDataConv[i] = static_cast<uint8_t> ( vecbyData[i] );
EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE );
} }
// important: vecbyDataConv must have iNumBytes to get it work!!!
if ( ParseMessageFrame ( vecbyDataConv, iRecCounter, iRecID, vecData ) )
{
// In case we received a message and returned an answer but our answer
// did not make it to the receiver, he will resend his message. We check
// here if the message is the same as the old one, and if this is the
// case, just resend our old answer again
if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) )
{
// acknowledgments are not acknowledged
if ( iRecID != PROTMESSID_ACKN )
{
// re-send acknowledgement
CreateAndSendAcknMess ( iRecID, iRecCounter );
}
}
else
{
// check which type of message we received and do action
switch ( iRecID )
{
case PROTMESSID_ACKN:
// extract data from stream and emit signal for received value
iPos = 0;
iData = static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
// check if this is the correct acknowledgment
if ( ( SendMessQueue.front().iCnt == iRecCounter ) &&
( SendMessQueue.front().iID == iData ) )
{
// message acknowledged, remove from queue
SendMessQueue.pop_front();
// send next message in queue
SendMessage();
}
break;
case PROTMESSID_JITT_BUF_SIZE:
// extract data from stream and emit signal for received value
iPos = 0;
iData = static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
// invoke message action
emit ChangeJittBufSize ( iData );
// send acknowledge message
CreateAndSendAcknMess ( iRecID, iRecCounter );
break;
}
}
// save current message ID and counter to find out if message was re-sent
iOldRecID = iRecID;
iOldRecCnt = iRecCounter;
return true; // everything was ok
}
else
{
return false; // return error code
}
}
Mutex.unlock();
}
void CProtocol::CreateJitBufMes ( const int iJitBufSize )
{
Mutex.lock();
{
CVector<uint8_t> vecNewMessage;
CVector<uint8_t> vecData ( 2 ); // 2 bytes of data
unsigned int iPos = 0; // init position pointer
// store current counter value
const int iCurCounter = iCounter;
// increase counter (wraps around automatically)
iCounter++;
// build data vector
PutValOnStream ( vecData, iPos, static_cast<uint32_t> ( iJitBufSize ), 2 );
// build complete message
GenMessageFrame ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE, vecData );
// enqueue message
EnqueueMessage ( vecNewMessage, iCurCounter, PROTMESSID_JITT_BUF_SIZE );
}
Mutex.unlock();
}
/******************************************************************************\ /******************************************************************************\

View file

@ -118,11 +118,12 @@ protected:
uint8_t iCounter; uint8_t iCounter;
int iOldRecID, iOldRecCnt; int iOldRecID, iOldRecCnt;
std::list<CSendMessage> SendMessQueue; std::list<CSendMessage> SendMessQueue;
QTimer TimerSendMess; QTimer TimerSendMess;
QMutex Mutex; QMutex Mutex;
public slots: public slots:
void OnTimerSendMess() { SendMessage(); } void OnTimerSendMess();
signals: signals:
// transmitting // transmitting

View file

@ -105,13 +105,20 @@ void CSocket::OnDataReceived ()
return; return;
} }
if ( pChannel->PutData( vecbyRecBuf, iNumBytesRead ) ) switch ( pChannel->PutData( vecbyRecBuf, iNumBytesRead ) )
{ {
case PS_AUDIO_OK:
PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_GREEN ); PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_GREEN );
} break;
else
{ case PS_AUDIO_ERR:
case PS_GEN_ERROR:
PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_RED ); PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_RED );
break;
case PS_PROT_ERR:
PostWinMessage ( MS_JIT_BUF_PUT, MUL_COL_LED_YELLOW );
break;
} }
} }
else else