Added support for putting the receive socket in a separate thread. Unfortunately the current code does not work as expected. Some Qt warnings appear and after a while the receive socket does not receive any more network packets. Therefore a global macro is introduced to enable/disable the new code. Default is: disabled.

This commit is contained in:
Volker Fischer 2013-06-03 16:07:17 +00:00
parent b8e9fcc4a3
commit 6bdb82c65f
12 changed files with 483 additions and 374 deletions

View File

@ -102,10 +102,6 @@ QObject::connect ( &Protocol,
SIGNAL ( OpusSupported() ),
SIGNAL ( OpusSupported() ) );
QObject::connect ( &Protocol,
SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ),
SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ) );
QObject::connect ( &Protocol,
SIGNAL ( NetTranspPropsReceived ( CNetworkTransportProps ) ),
this, SLOT ( OnNetTranspPropsReceived ( CNetworkTransportProps ) ) );
@ -113,6 +109,14 @@ QObject::connect ( &Protocol,
QObject::connect ( &Protocol,
SIGNAL ( ReqNetTranspProps() ),
this, SLOT ( OnReqNetTranspProps() ) );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// this connection is intended for a thread transition if we have a
// separate socket thread running
QObject::connect ( this,
SIGNAL ( ParseMessageBody ( CVector<uint8_t>, int, int ) ),
this, SLOT ( OnParseMessageBody ( CVector<uint8_t>, int, int ) ) );
#endif
}
bool CChannel::ProtocolIsEnabled()
@ -151,60 +155,83 @@ void CChannel::SetAudioStreamProperties ( const EAudComprType eNewAudComprType,
const int iNewNetwFrameSizeFact,
const int iNewNumAudioChannels )
{
// this function is intended for the server (not the client)
QMutexLocker locker ( &Mutex );
/*
this function is intended for the server (not the client)
*/
// store new values
eAudioCompressionType = eNewAudComprType;
iNumAudioChannels = iNewNumAudioChannels;
iNetwFrameSize = iNewNetwFrameSize;
iNetwFrameSizeFact = iNewNetwFrameSizeFact;
CNetworkTransportProps NetworkTransportProps;
// init socket buffer
SockBuf.Init ( iNetwFrameSize, iCurSockBufNumFrames );
Mutex.lock();
{
// store new values
eAudioCompressionType = eNewAudComprType;
iNumAudioChannels = iNewNumAudioChannels;
iNetwFrameSize = iNewNetwFrameSize;
iNetwFrameSizeFact = iNewNetwFrameSizeFact;
// init conversion buffer
ConvBuf.Init ( iNetwFrameSize * iNetwFrameSizeFact );
// init socket buffer
SockBuf.Init ( iNetwFrameSize, iCurSockBufNumFrames );
// tell the server that audio coding has changed
CreateNetTranspPropsMessFromCurrentSettings();
// init conversion buffer
ConvBuf.Init ( iNetwFrameSize * iNetwFrameSizeFact );
// fill network transport properties struct
NetworkTransportProps =
GetNetworkTransportPropsFromCurrentSettings();
}
Mutex.unlock();
// tell the server about the new network settings
Protocol.CreateNetwTranspPropsMes ( NetworkTransportProps );
}
bool CChannel::SetSockBufNumFrames ( const int iNewNumFrames,
const bool bPreserve )
{
QMutexLocker locker ( &Mutex ); // this operation must be done with mutex
bool ReturnValue = true; // init with error
bool bCurDoAutoSockBufSize = false; // we have to init but init values does not matter
// first check for valid input parameter range
if ( ( iNewNumFrames >= MIN_NET_BUF_SIZE_NUM_BL ) &&
( iNewNumFrames <= MAX_NET_BUF_SIZE_NUM_BL ) )
Mutex.lock();
{
// only apply parameter if new parameter is different from current one
if ( iCurSockBufNumFrames != iNewNumFrames )
// first check for valid input parameter range
if ( ( iNewNumFrames >= MIN_NET_BUF_SIZE_NUM_BL ) &&
( iNewNumFrames <= MAX_NET_BUF_SIZE_NUM_BL ) )
{
// store new value
iCurSockBufNumFrames = iNewNumFrames;
// the network block size is a multiple of the minimum network
// block size
SockBuf.Init ( iNetwFrameSize, iNewNumFrames, bPreserve );
// only in case we are the server and auto jitter buffer setting is
// enabled, we have to report the current setting to the client
if ( bIsServer && bDoAutoSockBufSize )
// only apply parameter if new parameter is different from current one
if ( iCurSockBufNumFrames != iNewNumFrames )
{
// we cannot call the "CreateJitBufMes" function directly since
// this would give us problems with different threads (e.g. the
// timer thread) and the protocol mechanism (problem with
// qRegisterMetaType(), etc.)
emit ServerAutoSockBufSizeChange ( iNewNumFrames );
}
// store new value
iCurSockBufNumFrames = iNewNumFrames;
return false; // -> no error
// the network block size is a multiple of the minimum network
// block size
SockBuf.Init ( iNetwFrameSize, iNewNumFrames, bPreserve );
// store current auto socket buffer size setting in the mutex
// region since if we use the current parameter below in the
// if condition, it may have been changed in between the time
// when we have left the mutex region and entered the if
// condition
bCurDoAutoSockBufSize = bDoAutoSockBufSize;
ReturnValue = false; // -> no error
}
}
}
Mutex.unlock();
return true; // set error flag
// only in case there is no error, we are the server and auto jitter buffer
// setting is enabled, we have to report the current setting to the client
if ( !ReturnValue && bIsServer && bCurDoAutoSockBufSize )
{
// we cannot call the "CreateJitBufMes" function directly since
// this would give us problems with different threads (e.g. the
// timer thread) and the protocol mechanism (problem with
// qRegisterMetaType(), etc.)
emit ServerAutoSockBufSizeChange ( iNewNumFrames );
}
return ReturnValue; // set error flag
}
void CChannel::SetGain ( const int iChanID,
@ -373,7 +400,7 @@ void CChannel::OnNetTranspPropsReceived ( CNetworkTransportProps NetworkTranspor
// if old CELT codec is used, inform the client that the new OPUS codec
// is supported
if ( eAudioCompressionType != CT_OPUS )
if ( NetworkTransportProps.eAudioCodingType != CT_OPUS )
{
Protocol.CreateOpusSupportedMes();
}
@ -382,12 +409,15 @@ void CChannel::OnNetTranspPropsReceived ( CNetworkTransportProps NetworkTranspor
void CChannel::OnReqNetTranspProps()
{
CreateNetTranspPropsMessFromCurrentSettings();
// fill network transport properties struct from current settings and send it
Protocol.CreateNetwTranspPropsMes ( GetNetworkTransportPropsFromCurrentSettings() );
}
void CChannel::CreateNetTranspPropsMessFromCurrentSettings()
CNetworkTransportProps CChannel::GetNetworkTransportPropsFromCurrentSettings()
{
CNetworkTransportProps NetworkTransportProps (
// use current stored settings of the channel to fill the network transport
// properties structure
return CNetworkTransportProps (
iNetwFrameSize,
iNetwFrameSizeFact,
iNumAudioChannels,
@ -395,9 +425,6 @@ void CChannel::CreateNetTranspPropsMessFromCurrentSettings()
eAudioCompressionType,
0, // version of the codec
0 );
// send current network transport properties
Protocol.CreateNetwTranspPropsMes ( NetworkTransportProps );
}
void CChannel::Disconnect()
@ -415,45 +442,88 @@ void CChannel::Disconnect()
EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
int iNumBytes )
{
EPutDataStat eRet = PS_GEN_ERROR;
/*
Note that this function might be called from a different thread (separate
Socket thread) and therefore we should not call functions which emit signals
themself directly but emit a signal here so that the thread transition is
done as early as possible.
This is the reason why "ParseMessageBody" is not called directly but through a
signal-slot mechanism.
*/
// init flags
bool bIsProtocolPacket = false;
bool bNewConnection = false;
// init return state
EPutDataStat eRet = PS_GEN_ERROR;
if ( bIsEnabled )
{
// first check if this is protocol data
// only use protocol data if protocol mechanism is enabled
if ( ProtocolIsEnabled() )
int iRecCounter;
int iRecID;
CVector<uint8_t> vecbyMesBodyData;
// init flag
bool bNewConnection = false;
// check if this is a protocol message by trying to parse the message
// frame
if ( !Protocol.ParseMessageFrame ( vecbyData,
iNumBytes,
vecbyMesBodyData,
iRecCounter,
iRecID ) )
{
// parse the message assuming this is a protocol message
if ( !Protocol.ParseMessage ( vecbyData, iNumBytes ) )
// This is a protocol message:
// only use protocol data if protocol mechanism is enabled
if ( ProtocolIsEnabled() )
{
// set status flags
eRet = PS_PROT_OK;
bIsProtocolPacket = true;
// in case this is a connection less message, we do not process it here
if ( Protocol.IsConnectionLessMessageID ( iRecID ) )
{
// fire a signal so that an other class can process this type of
// message
emit DetectedCLMessage ( vecbyMesBodyData, iRecID );
// set status flag
eRet = PS_PROT_OK;
}
else
{
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// parse the message assuming this is a regular protocol message
emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
// note that protocol OK is not correct here since we do not
// check if the protocol was ok since we emit just a signal
// and do not get any feedback on the protocol decoding state
eRet = PS_PROT_OK;
#else
// parse the message assuming this is a protocol message
if ( !Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ) )
{
// set status flag
eRet = PS_PROT_OK;
}
#endif
}
}
else
{
// In case we are the server and the current channel is not
// connected, we do not evaluate protocol messages but these
// messages could start the server which is not desired,
// especially not for the disconnect messages.
// We now do not start the server if a valid protocol message
// was received but only start the server on audio packets.
// set status flag
eRet = PS_PROT_OK_MESS_NOT_EVALUATED;
}
}
else
{
// In case we are the server and the current channel is not
// connected, we do not evaluate protocal messages but these
// messages could start the server which is not desired, especially
// not for the disconnect messages.
// We now do not start the server if a valid protocol message
// was received but only start the server on audio packets
if ( Protocol.IsProtocolMessage ( vecbyData, iNumBytes ) )
{
// set status flags
eRet = PS_PROT_OK_MESS_NOT_EVALUATED;
bIsProtocolPacket = true;
}
}
// This seems to be an audio packet (only try to parse audio if it
// was not a protocol packet):
// only try to parse audio if it was not a protocol packet
if ( !bIsProtocolPacket )
{
Mutex.lock();
{
// only process audio if packet has correct size
@ -476,12 +546,12 @@ EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
eRet = PS_PROT_ERR;
}
// all network packets except of valid protocol messages
// All network packets except of valid protocol messages
// regardless if they are valid or invalid audio packets lead to
// a state change to a connected channel
// this is because protocol messages can only be sent on a
// a state change to a connected channel.
// This is because protocol messages can only be sent on a
// connected channel and the client has to inform the server
// about the audio packet properties via the protocol
// about the audio packet properties via the protocol.
// check if channel was not connected, this is a new connection
// (do not fire an event directly since we are inside a mutex
@ -506,55 +576,62 @@ EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
EGetDataStat CChannel::GetData ( CVector<uint8_t>& vecbyData )
{
QMutexLocker locker ( &Mutex );
EGetDataStat eGetStatus;
const bool bSockBufState = SockBuf.Get ( vecbyData );
// decrease time-out counter
if ( iConTimeOut > 0 )
Mutex.lock();
{
// subtract the number of samples of the current block since the
// time out counter is based on samples not on blocks (definition:
// always one atomic block is get by using the GetData() function
// where the atomic block size is "SYSTEM_FRAME_SIZE_SAMPLES")
// the socket access must be inside a mutex
const bool bSockBufState = SockBuf.Get ( vecbyData );
// decrease time-out counter
if ( iConTimeOut > 0 )
{
// subtract the number of samples of the current block since the
// time out counter is based on samples not on blocks (definition:
// always one atomic block is get by using the GetData() function
// where the atomic block size is "SYSTEM_FRAME_SIZE_SAMPLES")
// TODO this code only works with the above assumption -> better
// implementation so that we are not depending on assumptions
iConTimeOut -= SYSTEM_FRAME_SIZE_SAMPLES;
iConTimeOut -= SYSTEM_FRAME_SIZE_SAMPLES;
if ( iConTimeOut <= 0 )
{
// channel is just disconnected
eGetStatus = GS_CHAN_NOW_DISCONNECTED;
iConTimeOut = 0; // make sure we do not have negative values
// reset network transport properties
ResetNetworkTransportProperties();
// emit message
emit Disconnected();
}
else
{
if ( bSockBufState )
if ( iConTimeOut <= 0 )
{
// everything is ok
eGetStatus = GS_BUFFER_OK;
// channel is just disconnected
eGetStatus = GS_CHAN_NOW_DISCONNECTED;
iConTimeOut = 0; // make sure we do not have negative values
// reset network transport properties
ResetNetworkTransportProperties();
}
else
{
// channel is not yet disconnected but no data in buffer
eGetStatus = GS_BUFFER_UNDERRUN;
if ( bSockBufState )
{
// everything is ok
eGetStatus = GS_BUFFER_OK;
}
else
{
// channel is not yet disconnected but no data in buffer
eGetStatus = GS_BUFFER_UNDERRUN;
}
}
}
else
{
// channel is disconnected
eGetStatus = GS_CHAN_NOT_CONNECTED;
}
}
else
Mutex.unlock();
// in case we are just disconnected, we have to fire a message
if ( eGetStatus == GS_CHAN_NOW_DISCONNECTED )
{
// channel is disconnected
eGetStatus = GS_CHAN_NOT_CONNECTED;
// emit message
emit Disconnected();
}
return eGetStatus;

View File

@ -150,7 +150,7 @@ void CreateConClientListNameMes ( const CVector<CChannelInfo>& vecChanInfo )
void CreateConClientListMes ( const CVector<CChannelInfo>& vecChanInfo )
{ Protocol.CreateConClientListMes ( vecChanInfo ); }
void CreateNetTranspPropsMessFromCurrentSettings();
CNetworkTransportProps GetNetworkTransportPropsFromCurrentSettings();
protected:
bool ProtocolIsEnabled();
@ -209,6 +209,12 @@ public slots:
void OnNetTranspPropsReceived ( CNetworkTransportProps NetworkTransportProps );
void OnReqNetTranspProps();
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
void OnParseMessageBody ( CVector<uint8_t> vecbyMesBodyData,
int iRecCounter,
int iRecID ) { Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); }
#endif
signals:
void MessReadyForSending ( CVector<uint8_t> vecMessage );
void NewConnection();
@ -225,8 +231,15 @@ signals:
void PingReceived ( int iMs );
void ReqNetTranspProps();
void Disconnected();
void DetectedCLMessage ( CVector<uint8_t> vecbyData,
int iNumBytes );
void DetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData,
int iRecID );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
void ParseMessageBody ( CVector<uint8_t> vecbyMesBodyData,
int iRecCounter,
int iRecID );
#endif
};
#endif /* !defined ( CHANNEL_HOIH9345KJH98_3_4344_BB23945IUHF1912__INCLUDED_ ) */

View File

@ -225,29 +225,29 @@ void CClient::OnInvalidPacketReceived ( CVector<uint8_t> vecbyRecBuf,
// e.g., open the connection setup dialog since then we are not
// yet connected but talk to the central server with the
// connection less protocol)
if ( ConnLessProtocol.ParseConnectionLessMessage ( vecbyRecBuf,
iNumBytesRead,
RecHostAddr ) )
if ( ConnLessProtocol.ParseConnectionLessMessageWithFrame ( vecbyRecBuf,
iNumBytesRead,
RecHostAddr ) )
{
// message coult not be parsed, check if the packet comes
// from the server we just connected -> if yes, send
// disconnect message since the server may not know that we
// are not connected anymore
if ( Channel. GetAddress() == RecHostAddr )
if ( Channel.GetAddress() == RecHostAddr )
{
ConnLessProtocol.CreateCLDisconnection ( RecHostAddr );
}
}
}
void CClient::OnDetectedCLMessage ( CVector<uint8_t> vecbyData,
int iNumBytes )
void CClient::OnDetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData,
int iRecID )
{
// this is a special case: we received a connection less message but we are
// in a connection
ConnLessProtocol.ParseConnectionLessMessage ( vecbyData,
iNumBytes,
Channel.GetAddress() );
ConnLessProtocol.ParseConnectionLessMessageBody ( vecbyMesBodyData,
iRecID,
Channel.GetAddress() );
}
void CClient::OnJittBufSizeChanged ( int iNewJitBufSize )

View File

@ -301,7 +301,11 @@ void SetAudoCompressiontype ( const EAudComprType eNAudCompressionType );
bool bIsInitializationPhase;
CVector<unsigned char> vecCeltData;
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
CHighPrioSocket Socket;
#else
CSocket Socket;
#endif
CSound Sound;
CStereoSignalLevelMeter SignalLevelMeter;
@ -350,7 +354,7 @@ public slots:
void OnInvalidPacketReceived ( CVector<uint8_t> vecbyRecBuf,
int iNumBytesRead,
CHostAddress RecHostAddr );
void OnDetectedCLMessage ( CVector<uint8_t> vecbyData, int iNumBytes );
void OnDetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData, int iRecID );
void OnReqJittBufSize() { CreateServerJitterBufferMessage(); }
void OnJittBufSizeChanged ( int iNewJitBufSize );
void OnReqChanInfo() { Channel.SetRemoteInfo ( ChannelInfo ); }

View File

@ -324,17 +324,8 @@ CClientDlg::CClientDlg ( CClient* pNCliP,
// View menu --------------------------------------------------------------
pViewMenu = new QMenu ( "&View", this );
// TODO This View menu item is disabled for now. The reason for that is that if
// if the connection setup dialog is opened during an active connection, we will
// get audio drop outs everytime the ping times in the list are updated which is
// not acceptable.
// Fix the issue with the thread priorities and enable the menu item again...
/*
pViewMenu->addAction ( tr ( "&Connection Setup..." ), this,
SLOT ( OnOpenConnectionSetupDialog() ) );
*/
pViewMenu->addAction ( tr ( "C&hat..." ), this,
SLOT ( OnOpenChatDialog() ) );

View File

@ -65,6 +65,24 @@ LED bar: lbr
/* Definitions ****************************************************************/
// The receive socket should be put in a high priority thread to ensure the GUI
// does not effect the stability of the audio stream (e.g. if the GUI is on
// high load because of a table update, the incoming network packets must still
// be put in the jitter buffer with highest priority).
// Unfortunately, the current implementation of the separate thread does not
// work correctly. Therefore we introduced this macro to enable/disable the new
// code. If the new code is enabled, under Windows Qt warnings like
// "QSocketNotifier: Multiple socket notifiers for same socket" appear and after
// a while the receive socket does not receive any new network packets anymore.
// This has to be solved!
//#define ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
#undef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// define this macro to get debug output
//#define _DEBUG_
#undef _DEBUG_

View File

@ -372,9 +372,17 @@ int main ( int argc, char** argv )
QApplication app ( argc, argv, bUseGUI );
#ifdef _WIN32
// Set application priority class -> high priority
// set application priority class -> high priority
SetPriorityClass ( GetCurrentProcess(), HIGH_PRIORITY_CLASS );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// Since we have a higher priority for the entire application, we can give
// the GUI thread the lowest possible priority which is still above the
// Windows "normal" priority so that it does not slow down the sound card
// and network working threads.
QThread::currentThread()->setPriority ( QThread::LowestPriority );
#endif
// For accessible support we need to add a plugin to qt. The plugin has to
// be located in the install directory of the software by the installer.
// Here, we set the path to our application path.

View File

@ -425,176 +425,168 @@ void CProtocol::CreateAndImmSendConLessMessage ( const int iID,
emit CLMessReadyForSending ( InetAddr, vecNewMessage );
}
bool CProtocol::IsProtocolMessage ( const CVector<uint8_t>& vecbyData,
const int iNumBytes )
{
/*
just check if this is a protocol message but do not act on message
*/
int iRecCounter, iRecID;
CVector<uint8_t> vecData;
return !ParseMessageFrame ( vecbyData,
iNumBytes,
iRecCounter,
iRecID,
vecData );
}
bool CProtocol::ParseMessage ( const CVector<uint8_t>& vecbyData,
const int iNumBytes )
bool CProtocol::ParseMessageBody ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecCounter,
const int iRecID )
{
/*
return code: false -> ok; true -> error
*/
bool bRet = false;
bool bSendNextMess;
int iRecCounter, iRecID;
CVector<uint8_t> vecData;
if ( !ParseMessageFrame ( vecbyData, iNumBytes, iRecCounter, iRecID, vecData ) )
{
bool bRet = false;
bool bSendNextMess;
/*
// TEST channel implementation: randomly delete protocol messages (50 % loss)
if ( rand() < ( RAND_MAX / 2 ) ) return false;
*/
// in case this is a connection less message, we do not process it here
if ( IsConnectionLessMessageID ( iRecID ) )
// In case we received a message and returned an answer but our answer
// did not make it to the receiver, he will resend his message. We check
// here if the message is the same as the old one, and if this is the
// case, just resend our old answer again
if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) )
{
// acknowledgments are not acknowledged
if ( iRecID != PROTMESSID_ACKN )
{
// fire a signal so that an other class can process this type of
// message
emit DetectedCLMessage ( vecbyData, iNumBytes );
// return function without issuing an error code (since it is a
// regular message but will just not processed here)
return false;
// resend acknowledgement
CreateAndImmSendAcknMess ( iRecID, iRecCounter );
}
// In case we received a message and returned an answer but our answer
// did not make it to the receiver, he will resend his message. We check
// here if the message is the same as the old one, and if this is the
// case, just resend our old answer again
if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) )
}
else
{
// special treatment for acknowledge messages
if ( iRecID == PROTMESSID_ACKN )
{
// acknowledgments are not acknowledged
if ( iRecID != PROTMESSID_ACKN )
// extract data from stream and emit signal for received value
int iPos = 0;
const int iData =
static_cast<int> ( GetValFromStream ( vecbyMesBodyData, iPos, 2 ) );
Mutex.lock();
{
// resend acknowledgement
CreateAndImmSendAcknMess ( iRecID, iRecCounter );
// check if this is the correct acknowledgment
bSendNextMess = false;
if ( !SendMessQueue.empty() )
{
if ( ( SendMessQueue.front().iCnt == iRecCounter ) &&
( SendMessQueue.front().iID == iData ) )
{
// message acknowledged, remove from queue
SendMessQueue.pop_front();
// send next message in queue
bSendNextMess = true;
}
}
}
Mutex.unlock();
if ( bSendNextMess )
{
SendMessage();
}
}
else
{
// special treatment for acknowledge messages
if ( iRecID == PROTMESSID_ACKN )
// check which type of message we received and do action
switch ( iRecID )
{
// extract data from stream and emit signal for received value
int iPos = 0;
const int iData =
static_cast<int> ( GetValFromStream ( vecData, iPos, 2 ) );
case PROTMESSID_JITT_BUF_SIZE:
bRet = EvaluateJitBufMes ( vecbyMesBodyData );
break;
Mutex.lock();
{
// check if this is the correct acknowledgment
bSendNextMess = false;
if ( !SendMessQueue.empty() )
{
if ( ( SendMessQueue.front().iCnt == iRecCounter ) &&
( SendMessQueue.front().iID == iData ) )
{
// message acknowledged, remove from queue
SendMessQueue.pop_front();
case PROTMESSID_REQ_JITT_BUF_SIZE:
bRet = EvaluateReqJitBufMes();
break;
// send next message in queue
bSendNextMess = true;
}
}
}
Mutex.unlock();
if ( bSendNextMess )
{
SendMessage();
}
}
else
{
// check which type of message we received and do action
switch ( iRecID )
{
case PROTMESSID_JITT_BUF_SIZE:
bRet = EvaluateJitBufMes ( vecData );
break;
case PROTMESSID_REQ_JITT_BUF_SIZE:
bRet = EvaluateReqJitBufMes();
break;
case PROTMESSID_CHANNEL_GAIN:
bRet = EvaluateChanGainMes ( vecData );
break;
case PROTMESSID_CHANNEL_GAIN:
bRet = EvaluateChanGainMes ( vecbyMesBodyData );
break;
// #### COMPATIBILITY OLD VERSION, TO BE REMOVED ####
case PROTMESSID_CONN_CLIENTS_LIST_NAME:
bRet = EvaluateConClientListNameMes ( vecData );
bRet = EvaluateConClientListNameMes ( vecbyMesBodyData );
break;
case PROTMESSID_CONN_CLIENTS_LIST:
bRet = EvaluateConClientListMes ( vecData );
break;
case PROTMESSID_CONN_CLIENTS_LIST:
bRet = EvaluateConClientListMes ( vecbyMesBodyData );
break;
case PROTMESSID_REQ_CONN_CLIENTS_LIST:
bRet = EvaluateReqConnClientsList();
break;
case PROTMESSID_REQ_CONN_CLIENTS_LIST:
bRet = EvaluateReqConnClientsList();
break;
// #### COMPATIBILITY OLD VERSION, TO BE REMOVED ####
case PROTMESSID_CHANNEL_NAME:
bRet = EvaluateChanNameMes ( vecData );
bRet = EvaluateChanNameMes ( vecbyMesBodyData );
break;
case PROTMESSID_CHANNEL_INFOS:
bRet = EvaluateChanInfoMes ( vecData );
break;
case PROTMESSID_CHANNEL_INFOS:
bRet = EvaluateChanInfoMes ( vecbyMesBodyData );
break;
case PROTMESSID_REQ_CHANNEL_INFOS:
bRet = EvaluateReqChanInfoMes();
break;
case PROTMESSID_REQ_CHANNEL_INFOS:
bRet = EvaluateReqChanInfoMes();
break;
case PROTMESSID_CHAT_TEXT:
bRet = EvaluateChatTextMes ( vecData );
break;
case PROTMESSID_CHAT_TEXT:
bRet = EvaluateChatTextMes ( vecbyMesBodyData );
break;
// #### COMPATIBILITY OLD VERSION, TO BE REMOVED ####
case PROTMESSID_PING_MS:
bRet = EvaluatePingMes ( vecData );
bRet = EvaluatePingMes ( vecbyMesBodyData );
break;
case PROTMESSID_NETW_TRANSPORT_PROPS:
bRet = EvaluateNetwTranspPropsMes ( vecData );
break;
case PROTMESSID_NETW_TRANSPORT_PROPS:
bRet = EvaluateNetwTranspPropsMes ( vecbyMesBodyData );
break;
case PROTMESSID_REQ_NETW_TRANSPORT_PROPS:
bRet = EvaluateReqNetwTranspPropsMes();
break;
case PROTMESSID_REQ_NETW_TRANSPORT_PROPS:
bRet = EvaluateReqNetwTranspPropsMes();
break;
// #### COMPATIBILITY OLD VERSION, TO BE REMOVED ####
case PROTMESSID_OPUS_SUPPORTED:
bRet = EvaluateOpusSupportedMes();
break;
}
// immediately send acknowledge message
CreateAndImmSendAcknMess ( iRecID, iRecCounter );
// save current message ID and counter to find out if message
// was resent
iOldRecID = iRecID;
iOldRecCnt = iRecCounter;
}
// immediately send acknowledge message
CreateAndImmSendAcknMess ( iRecID, iRecCounter );
// save current message ID and counter to find out if message
// was resent
iOldRecID = iRecID;
iOldRecCnt = iRecCounter;
}
}
return bRet;
}
bool CProtocol::ParseConnectionLessMessageWithFrame ( const CVector<uint8_t>& vecbyData,
const int iNumBytesIn,
const CHostAddress& InetAddr )
{
/*
return code: false -> ok; true -> error
*/
bool bRet = false;
int iRecCounter, iRecID;
CVector<uint8_t> vecbyMesBodyData;
if ( !ParseMessageFrame ( vecbyData,
iNumBytesIn,
vecbyMesBodyData,
iRecCounter,
iRecID ) )
{
bRet = ParseConnectionLessMessageBody ( vecbyMesBodyData,
iRecID,
InetAddr );
}
else
{
bRet = true; // return error code
@ -603,70 +595,60 @@ case PROTMESSID_OPUS_SUPPORTED:
return bRet;
}
bool CProtocol::ParseConnectionLessMessage ( const CVector<uint8_t>& vecbyData,
const int iNumBytes,
const CHostAddress& InetAddr )
bool CProtocol::ParseConnectionLessMessageBody ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecID,
const CHostAddress& InetAddr )
{
/*
return code: false -> ok; true -> error
*/
bool bRet = false;
int iRecCounter, iRecID;
CVector<uint8_t> vecData;
if ( !ParseMessageFrame ( vecbyData, iNumBytes, iRecCounter, iRecID, vecData ) )
{
bool bRet = false;
/*
// TEST channel implementation: randomly delete protocol messages (50 % loss)
if ( rand() < ( RAND_MAX / 2 ) ) return false;
*/
if ( IsConnectionLessMessageID ( iRecID ) )
if ( IsConnectionLessMessageID ( iRecID ) )
{
// check which type of message we received and do action
switch ( iRecID )
{
// check which type of message we received and do action
switch ( iRecID )
{
case PROTMESSID_CLM_PING_MS:
bRet = EvaluateCLPingMes ( InetAddr, vecData );
break;
case PROTMESSID_CLM_PING_MS:
bRet = EvaluateCLPingMes ( InetAddr, vecbyMesBodyData );
break;
case PROTMESSID_CLM_PING_MS_WITHNUMCLIENTS:
bRet = EvaluateCLPingWithNumClientsMes ( InetAddr, vecData );
break;
case PROTMESSID_CLM_PING_MS_WITHNUMCLIENTS:
bRet = EvaluateCLPingWithNumClientsMes ( InetAddr, vecbyMesBodyData );
break;
case PROTMESSID_CLM_SERVER_FULL:
bRet = EvaluateCLServerFullMes();
break;
case PROTMESSID_CLM_SERVER_FULL:
bRet = EvaluateCLServerFullMes();
break;
case PROTMESSID_CLM_SERVER_LIST:
bRet = EvaluateCLServerListMes ( InetAddr, vecData );
break;
case PROTMESSID_CLM_SERVER_LIST:
bRet = EvaluateCLServerListMes ( InetAddr, vecbyMesBodyData );
break;
case PROTMESSID_CLM_REQ_SERVER_LIST:
bRet = EvaluateCLReqServerListMes ( InetAddr );
break;
case PROTMESSID_CLM_REQ_SERVER_LIST:
bRet = EvaluateCLReqServerListMes ( InetAddr );
break;
case PROTMESSID_CLM_SEND_EMPTY_MESSAGE:
bRet = EvaluateCLSendEmptyMesMes ( vecData );
break;
case PROTMESSID_CLM_SEND_EMPTY_MESSAGE:
bRet = EvaluateCLSendEmptyMesMes ( vecbyMesBodyData );
break;
case PROTMESSID_CLM_REGISTER_SERVER:
bRet = EvaluateCLRegisterServerMes ( InetAddr, vecData );
break;
case PROTMESSID_CLM_REGISTER_SERVER:
bRet = EvaluateCLRegisterServerMes ( InetAddr, vecbyMesBodyData );
break;
case PROTMESSID_CLM_UNREGISTER_SERVER:
bRet = EvaluateCLUnregisterServerMes ( InetAddr );
break;
case PROTMESSID_CLM_UNREGISTER_SERVER:
bRet = EvaluateCLUnregisterServerMes ( InetAddr );
break;
case PROTMESSID_CLM_DISCONNECTION:
bRet = EvaluateCLDisconnectionMes ( InetAddr );
break;
}
}
else
{
bRet = true; // return error code
case PROTMESSID_CLM_DISCONNECTION:
bRet = EvaluateCLDisconnectionMes ( InetAddr );
break;
}
}
else
@ -1872,11 +1854,11 @@ bool CProtocol::EvaluateCLDisconnectionMes ( const CHostAddress& InetAddr )
/******************************************************************************\
* Message generation and parsing *
\******************************************************************************/
bool CProtocol::ParseMessageFrame ( const CVector<uint8_t>& vecIn,
bool CProtocol::ParseMessageFrame ( const CVector<uint8_t>& vecbyData,
const int iNumBytesIn,
CVector<uint8_t>& vecbyMesBodyData,
int& iCnt,
int& iID,
CVector<uint8_t>& vecData )
int& iID )
{
int i;
int iLenBy;
@ -1893,7 +1875,7 @@ bool CProtocol::ParseMessageFrame ( const CVector<uint8_t>& vecIn,
iCurPos = 0; // start from beginning
// 2 bytes TAG
const int iTag = static_cast<int> ( GetValFromStream ( vecIn, iCurPos, 2 ) );
const int iTag = static_cast<int> ( GetValFromStream ( vecbyData, iCurPos, 2 ) );
// check if tag is correct
if ( iTag != 0 )
@ -1902,13 +1884,13 @@ bool CProtocol::ParseMessageFrame ( const CVector<uint8_t>& vecIn,
}
// 2 bytes ID
iID = static_cast<int> ( GetValFromStream ( vecIn, iCurPos, 2 ) );
iID = static_cast<int> ( GetValFromStream ( vecbyData, iCurPos, 2 ) );
// 1 byte cnt
iCnt = static_cast<int> ( GetValFromStream ( vecIn, iCurPos, 1 ) );
iCnt = static_cast<int> ( GetValFromStream ( vecbyData, iCurPos, 1 ) );
// 2 bytes length
iLenBy = static_cast<int> ( GetValFromStream ( vecIn, iCurPos, 2 ) );
iLenBy = static_cast<int> ( GetValFromStream ( vecbyData, iCurPos, 2 ) );
// make sure the length is correct
if ( iLenBy != iNumBytesIn - MESS_LEN_WITHOUT_DATA_BYTE )
@ -1927,24 +1909,24 @@ bool CProtocol::ParseMessageFrame ( const CVector<uint8_t>& vecIn,
for ( i = 0; i < iLenCRCCalc; i++ )
{
CRCObj.AddByte ( static_cast<uint8_t> (
GetValFromStream ( vecIn, iCurPos, 1 ) ) );
GetValFromStream ( vecbyData, iCurPos, 1 ) ) );
}
if ( CRCObj.GetCRC () != GetValFromStream ( vecIn, iCurPos, 2 ) )
if ( CRCObj.GetCRC () != GetValFromStream ( vecbyData, iCurPos, 2 ) )
{
return true; // return error code
}
// Extract actual data -----------------------------------------------------
vecData.Init ( iLenBy );
vecbyMesBodyData.Init ( iLenBy );
iCurPos = MESS_HEADER_LENGTH_BYTE; // start from beginning of data
for ( i = 0; i < iLenBy; i++ )
{
vecData[i] = static_cast<uint8_t> (
GetValFromStream ( vecIn, iCurPos, 1 ) );
vecbyMesBodyData[i] = static_cast<uint8_t> (
GetValFromStream ( vecbyData, iCurPos, 1 ) );
}
return false; // no error

View File

@ -118,15 +118,26 @@ public:
void CreateCLEmptyMes ( const CHostAddress& InetAddr );
void CreateCLDisconnection ( const CHostAddress& InetAddr );
bool ParseMessage ( const CVector<uint8_t>& vecbyData,
const int iNumBytes );
bool ParseMessageFrame ( const CVector<uint8_t>& vecbyData,
const int iNumBytesIn,
CVector<uint8_t>& vecbyMesBodyData,
int& iRecCounter,
int& iRecID );
bool ParseConnectionLessMessage ( const CVector<uint8_t>& vecbyData,
const int iNumBytes,
const CHostAddress& InetAddr );
bool ParseMessageBody ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecCounter,
const int iRecID );
bool IsProtocolMessage ( const CVector<uint8_t>& vecbyData,
const int iNumBytes );
bool ParseConnectionLessMessageWithFrame ( const CVector<uint8_t>& vecbyData,
const int iNumBytesIn,
const CHostAddress& InetAddr );
bool ParseConnectionLessMessageBody ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecID,
const CHostAddress& InetAddr );
bool IsConnectionLessMessageID ( const int iID ) const
{ return (iID >= 1000) & (iID < 2000); }
// this function is public because we need it in the test bench
void CreateAndImmSendAcknMess ( const int& iID,
@ -160,12 +171,6 @@ protected:
const int iCnt,
const int iID );
bool ParseMessageFrame ( const CVector<uint8_t>& vecIn,
const int iNumBytesIn,
int& iCnt,
int& iID,
CVector<uint8_t>& vecData );
void GenMessageFrame ( CVector<uint8_t>& vecOut,
const int iCnt,
const int iID,
@ -189,9 +194,6 @@ protected:
const int iMaxStringLen,
QString& strOut );
bool IsConnectionLessMessageID ( const int iID ) const
{ return (iID >= 1000) & (iID < 2000); }
void SendMessage();
void CreateAndSendMessage ( const int iID,
@ -280,9 +282,6 @@ signals:
void CLReqServerList ( CHostAddress InetAddr );
void CLSendEmptyMes ( CHostAddress TargetInetAddr );
void CLDisconnection ( CHostAddress InetAddr );
void DetectedCLMessage ( CVector<uint8_t> vecbyData,
int iNumBytes );
};
#endif /* !defined ( PROTOCOL_H__3B123453_4344_BB2392354455IUHF1912__INCLUDED_ ) */

View File

@ -537,15 +537,15 @@ void CServer::OnSendCLProtMessage ( CHostAddress InetAddr,
Socket.SendPacket ( vecMessage, InetAddr );
}
void CServer::OnDetCLMess ( const CVector<uint8_t>& vecbyData,
const int iNumBytes,
void CServer::OnDetCLMess ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecID,
const CHostAddress& InetAddr )
{
// this is a special case: we received a connection less message but we are
// in a connection
ConnLessProtocol.ParseConnectionLessMessage ( vecbyData,
iNumBytes,
InetAddr );
ConnLessProtocol.ParseConnectionLessMessageBody ( vecbyMesBodyData,
iRecID,
InetAddr );
}
void CServer::OnCLDisconnection ( CHostAddress InetAddr )
@ -1195,9 +1195,9 @@ bool CServer::PutData ( const CVector<uint8_t>& vecbyRecBuf,
{
// this is a new client, we then first check if this is a connection
// less message before we create a new official channel
if ( ConnLessProtocol.ParseConnectionLessMessage ( vecbyRecBuf,
iNumBytesRead,
HostAdr ) )
if ( ConnLessProtocol.ParseConnectionLessMessageWithFrame ( vecbyRecBuf,
iNumBytesRead,
HostAdr ) )
{
// a new client is calling, look for free channel
iCurChanID = GetFreeChan();

View File

@ -265,8 +265,8 @@ public slots:
void OnNewConnection ( int iChID );
void OnSendCLProtMessage ( CHostAddress InetAddr, CVector<uint8_t> vecMessage );
void OnDetCLMess ( const CVector<uint8_t>& vecbyData,
const int iNumBytes,
void OnDetCLMess ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecID,
const CHostAddress& InetAddr );
void OnCLPingReceived ( CHostAddress InetAddr, int iMs )
@ -325,18 +325,18 @@ public slots:
void OnSendProtMessCh10 ( CVector<uint8_t> mess ) { OnSendProtMessage ( 10, mess ); }
void OnSendProtMessCh11 ( CVector<uint8_t> mess ) { OnSendProtMessage ( 11, mess ); }
void OnDetCLMessCh0 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[0].GetAddress() ); }
void OnDetCLMessCh1 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[1].GetAddress() ); }
void OnDetCLMessCh2 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[2].GetAddress() ); }
void OnDetCLMessCh3 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[3].GetAddress() ); }
void OnDetCLMessCh4 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[4].GetAddress() ); }
void OnDetCLMessCh5 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[5].GetAddress() ); }
void OnDetCLMessCh6 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[6].GetAddress() ); }
void OnDetCLMessCh7 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[7].GetAddress() ); }
void OnDetCLMessCh8 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[8].GetAddress() ); }
void OnDetCLMessCh9 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[9].GetAddress() ); }
void OnDetCLMessCh10 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[10].GetAddress() ); }
void OnDetCLMessCh11 ( CVector<uint8_t> vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[11].GetAddress() ); }
void OnDetCLMessCh0 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[0].GetAddress() ); }
void OnDetCLMessCh1 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[1].GetAddress() ); }
void OnDetCLMessCh2 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[2].GetAddress() ); }
void OnDetCLMessCh3 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[3].GetAddress() ); }
void OnDetCLMessCh4 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[4].GetAddress() ); }
void OnDetCLMessCh5 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[5].GetAddress() ); }
void OnDetCLMessCh6 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[6].GetAddress() ); }
void OnDetCLMessCh7 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[7].GetAddress() ); }
void OnDetCLMessCh8 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[8].GetAddress() ); }
void OnDetCLMessCh9 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[9].GetAddress() ); }
void OnDetCLMessCh10 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[10].GetAddress() ); }
void OnDetCLMessCh11 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[11].GetAddress() ); }
void OnNewConnectionCh0() { OnNewConnection ( 0 ); }
void OnNewConnectionCh1() { OnNewConnection ( 1 ); }

View File

@ -90,41 +90,58 @@ signals:
};
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
/* Socket which runs in a separate high priority thread ----------------------*/
/*
// TEST
// http://qt-project.org/forums/viewthread/14393
// http://qt-project.org/doc/qt-5.0/qtcore/qthread.html#Priority-enum
// http://qt-project.org/wiki/Threads_Events_QObjects
class CHighPrioSocket
class CHighPrioSocket : public QObject
{
Q_OBJECT
public:
CHighPrioSocket ( CChannel* pNewChannel,
const quint16 iPortNumber )
{
// TEST
worker = new CSocket ( pNewChannel, iPortNumber );
worker->moveToThread(&workerThread);
workerThread.start(QThread::TimeCriticalPriority);
// TEST we have to register some classes to the Qt signal/slot mechanism
// since now we actually have thread crossings with the new code
qRegisterMetaType<CVector<uint8_t> > ( "CVector<uint8_t>" );
qRegisterMetaType<CHostAddress> ( "CHostAddress" );
// TEST actual creation of the new thread, see internet references below:
// http://qt-project.org/forums/viewthread/14393
// http://qt-project.org/doc/qt-5.0/qtcore/qthread.html#Priority-enum
// http://qt-project.org/wiki/Threads_Events_QObjects
pSocket = new CSocket ( pNewChannel, iPortNumber );
pSocket->moveToThread ( &NetworkWorkerThread );
NetworkWorkerThread.start ( QThread::TimeCriticalPriority );
QObject::connect ( pSocket,
SIGNAL ( InvalidPacketReceived ( CVector<uint8_t>, int, CHostAddress ) ),
SIGNAL ( InvalidPacketReceived ( CVector<uint8_t>, int, CHostAddress ) ) );
}
virtual ~CHighPrioSocket()
{
NetworkWorkerThread.exit();
}
void SendPacket ( const CVector<uint8_t>& vecbySendBuf,
const CHostAddress& HostAddr )
{
worker->SendPacket ( vecbySendBuf, HostAddr );
pSocket->SendPacket ( vecbySendBuf, HostAddr );
}
protected:
QThread NetworkWorkerThread;
CSocket* pSocket;
// TEST
QThread workerThread;
CSocket* worker;
signals:
void InvalidPacketReceived ( CVector<uint8_t> vecbyRecBuf,
int iNumBytesRead,
CHostAddress RecHostAddr );
};
*/
#endif
#endif /* !defined ( SOCKET_HOIHGE76GEKJH98_3_4344_BB23945IUHF1912__INCLUDED_ ) */