From 6bdb82c65f895b455c82fb756fd980038e8fcc79 Mon Sep 17 00:00:00 2001 From: Volker Fischer Date: Mon, 3 Jun 2013 16:07:17 +0000 Subject: [PATCH] Added support for putting the receive socket in a separate thread. Unfortunately the current code does not work as expected. Some Qt warnings appear and after a while the receive socket does not receive any more network packets. Therefore a global macro is introduced to enable/disable the new code. Default is: disabled. --- src/channel.cpp | 303 ++++++++++++++++++++++++++--------------- src/channel.h | 19 ++- src/client.cpp | 18 +-- src/client.h | 6 +- src/clientdlg.cpp | 9 -- src/global.h | 18 +++ src/main.cpp | 10 +- src/protocol.cpp | 340 ++++++++++++++++++++++------------------------ src/protocol.h | 37 +++-- src/server.cpp | 16 +-- src/server.h | 28 ++-- src/socket.h | 53 +++++--- 12 files changed, 483 insertions(+), 374 deletions(-) diff --git a/src/channel.cpp b/src/channel.cpp index 7e914cda..3e7cc22e 100755 --- a/src/channel.cpp +++ b/src/channel.cpp @@ -102,10 +102,6 @@ QObject::connect ( &Protocol, SIGNAL ( OpusSupported() ), SIGNAL ( OpusSupported() ) ); - QObject::connect ( &Protocol, - SIGNAL ( DetectedCLMessage ( CVector, int ) ), - SIGNAL ( DetectedCLMessage ( CVector, int ) ) ); - QObject::connect ( &Protocol, SIGNAL ( NetTranspPropsReceived ( CNetworkTransportProps ) ), this, SLOT ( OnNetTranspPropsReceived ( CNetworkTransportProps ) ) ); @@ -113,6 +109,14 @@ QObject::connect ( &Protocol, QObject::connect ( &Protocol, SIGNAL ( ReqNetTranspProps() ), this, SLOT ( OnReqNetTranspProps() ) ); + +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + // this connection is intended for a thread transition if we have a + // separate socket thread running + QObject::connect ( this, + SIGNAL ( ParseMessageBody ( CVector, int, int ) ), + this, SLOT ( OnParseMessageBody ( CVector, int, int ) ) ); +#endif } bool CChannel::ProtocolIsEnabled() @@ -151,60 +155,83 @@ void CChannel::SetAudioStreamProperties ( const EAudComprType eNewAudComprType, const int iNewNetwFrameSizeFact, const int iNewNumAudioChannels ) { - // this function is intended for the server (not the client) - QMutexLocker locker ( &Mutex ); +/* + this function is intended for the server (not the client) +*/ - // store new values - eAudioCompressionType = eNewAudComprType; - iNumAudioChannels = iNewNumAudioChannels; - iNetwFrameSize = iNewNetwFrameSize; - iNetwFrameSizeFact = iNewNetwFrameSizeFact; + CNetworkTransportProps NetworkTransportProps; - // init socket buffer - SockBuf.Init ( iNetwFrameSize, iCurSockBufNumFrames ); + Mutex.lock(); + { + // store new values + eAudioCompressionType = eNewAudComprType; + iNumAudioChannels = iNewNumAudioChannels; + iNetwFrameSize = iNewNetwFrameSize; + iNetwFrameSizeFact = iNewNetwFrameSizeFact; - // init conversion buffer - ConvBuf.Init ( iNetwFrameSize * iNetwFrameSizeFact ); + // init socket buffer + SockBuf.Init ( iNetwFrameSize, iCurSockBufNumFrames ); - // tell the server that audio coding has changed - CreateNetTranspPropsMessFromCurrentSettings(); + // init conversion buffer + ConvBuf.Init ( iNetwFrameSize * iNetwFrameSizeFact ); + + // fill network transport properties struct + NetworkTransportProps = + GetNetworkTransportPropsFromCurrentSettings(); + } + Mutex.unlock(); + + // tell the server about the new network settings + Protocol.CreateNetwTranspPropsMes ( NetworkTransportProps ); } bool CChannel::SetSockBufNumFrames ( const int iNewNumFrames, const bool bPreserve ) { - QMutexLocker locker ( &Mutex ); // this operation must be done with mutex + bool ReturnValue = true; // init with error + bool bCurDoAutoSockBufSize = false; // we have to init but init values does not matter - // first check for valid input parameter range - if ( ( iNewNumFrames >= MIN_NET_BUF_SIZE_NUM_BL ) && - ( iNewNumFrames <= MAX_NET_BUF_SIZE_NUM_BL ) ) + Mutex.lock(); { - // only apply parameter if new parameter is different from current one - if ( iCurSockBufNumFrames != iNewNumFrames ) + // first check for valid input parameter range + if ( ( iNewNumFrames >= MIN_NET_BUF_SIZE_NUM_BL ) && + ( iNewNumFrames <= MAX_NET_BUF_SIZE_NUM_BL ) ) { - // store new value - iCurSockBufNumFrames = iNewNumFrames; - - // the network block size is a multiple of the minimum network - // block size - SockBuf.Init ( iNetwFrameSize, iNewNumFrames, bPreserve ); - - // only in case we are the server and auto jitter buffer setting is - // enabled, we have to report the current setting to the client - if ( bIsServer && bDoAutoSockBufSize ) + // only apply parameter if new parameter is different from current one + if ( iCurSockBufNumFrames != iNewNumFrames ) { - // we cannot call the "CreateJitBufMes" function directly since - // this would give us problems with different threads (e.g. the - // timer thread) and the protocol mechanism (problem with - // qRegisterMetaType(), etc.) - emit ServerAutoSockBufSizeChange ( iNewNumFrames ); - } + // store new value + iCurSockBufNumFrames = iNewNumFrames; - return false; // -> no error + // the network block size is a multiple of the minimum network + // block size + SockBuf.Init ( iNetwFrameSize, iNewNumFrames, bPreserve ); + + // store current auto socket buffer size setting in the mutex + // region since if we use the current parameter below in the + // if condition, it may have been changed in between the time + // when we have left the mutex region and entered the if + // condition + bCurDoAutoSockBufSize = bDoAutoSockBufSize; + + ReturnValue = false; // -> no error + } } } + Mutex.unlock(); - return true; // set error flag + // only in case there is no error, we are the server and auto jitter buffer + // setting is enabled, we have to report the current setting to the client + if ( !ReturnValue && bIsServer && bCurDoAutoSockBufSize ) + { + // we cannot call the "CreateJitBufMes" function directly since + // this would give us problems with different threads (e.g. the + // timer thread) and the protocol mechanism (problem with + // qRegisterMetaType(), etc.) + emit ServerAutoSockBufSizeChange ( iNewNumFrames ); + } + + return ReturnValue; // set error flag } void CChannel::SetGain ( const int iChanID, @@ -373,7 +400,7 @@ void CChannel::OnNetTranspPropsReceived ( CNetworkTransportProps NetworkTranspor // if old CELT codec is used, inform the client that the new OPUS codec // is supported - if ( eAudioCompressionType != CT_OPUS ) + if ( NetworkTransportProps.eAudioCodingType != CT_OPUS ) { Protocol.CreateOpusSupportedMes(); } @@ -382,12 +409,15 @@ void CChannel::OnNetTranspPropsReceived ( CNetworkTransportProps NetworkTranspor void CChannel::OnReqNetTranspProps() { - CreateNetTranspPropsMessFromCurrentSettings(); + // fill network transport properties struct from current settings and send it + Protocol.CreateNetwTranspPropsMes ( GetNetworkTransportPropsFromCurrentSettings() ); } -void CChannel::CreateNetTranspPropsMessFromCurrentSettings() +CNetworkTransportProps CChannel::GetNetworkTransportPropsFromCurrentSettings() { - CNetworkTransportProps NetworkTransportProps ( + // use current stored settings of the channel to fill the network transport + // properties structure + return CNetworkTransportProps ( iNetwFrameSize, iNetwFrameSizeFact, iNumAudioChannels, @@ -395,9 +425,6 @@ void CChannel::CreateNetTranspPropsMessFromCurrentSettings() eAudioCompressionType, 0, // version of the codec 0 ); - - // send current network transport properties - Protocol.CreateNetwTranspPropsMes ( NetworkTransportProps ); } void CChannel::Disconnect() @@ -415,45 +442,88 @@ void CChannel::Disconnect() EPutDataStat CChannel::PutData ( const CVector& vecbyData, int iNumBytes ) { - EPutDataStat eRet = PS_GEN_ERROR; +/* + Note that this function might be called from a different thread (separate + Socket thread) and therefore we should not call functions which emit signals + themself directly but emit a signal here so that the thread transition is + done as early as possible. + This is the reason why "ParseMessageBody" is not called directly but through a + signal-slot mechanism. +*/ - // init flags - bool bIsProtocolPacket = false; - bool bNewConnection = false; + // init return state + EPutDataStat eRet = PS_GEN_ERROR; if ( bIsEnabled ) { - // first check if this is protocol data - // only use protocol data if protocol mechanism is enabled - if ( ProtocolIsEnabled() ) + int iRecCounter; + int iRecID; + CVector vecbyMesBodyData; + + // init flag + bool bNewConnection = false; + + // check if this is a protocol message by trying to parse the message + // frame + if ( !Protocol.ParseMessageFrame ( vecbyData, + iNumBytes, + vecbyMesBodyData, + iRecCounter, + iRecID ) ) { - // parse the message assuming this is a protocol message - if ( !Protocol.ParseMessage ( vecbyData, iNumBytes ) ) + // This is a protocol message: + + // only use protocol data if protocol mechanism is enabled + if ( ProtocolIsEnabled() ) { - // set status flags - eRet = PS_PROT_OK; - bIsProtocolPacket = true; + // in case this is a connection less message, we do not process it here + if ( Protocol.IsConnectionLessMessageID ( iRecID ) ) + { + // fire a signal so that an other class can process this type of + // message + emit DetectedCLMessage ( vecbyMesBodyData, iRecID ); + + // set status flag + eRet = PS_PROT_OK; + } + else + { +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + // parse the message assuming this is a regular protocol message + emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); + + // note that protocol OK is not correct here since we do not + // check if the protocol was ok since we emit just a signal + // and do not get any feedback on the protocol decoding state + eRet = PS_PROT_OK; +#else + // parse the message assuming this is a protocol message + if ( !Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ) ) + { + // set status flag + eRet = PS_PROT_OK; + } +#endif + } + } + else + { + // In case we are the server and the current channel is not + // connected, we do not evaluate protocol messages but these + // messages could start the server which is not desired, + // especially not for the disconnect messages. + // We now do not start the server if a valid protocol message + // was received but only start the server on audio packets. + + // set status flag + eRet = PS_PROT_OK_MESS_NOT_EVALUATED; } } else { - // In case we are the server and the current channel is not - // connected, we do not evaluate protocal messages but these - // messages could start the server which is not desired, especially - // not for the disconnect messages. - // We now do not start the server if a valid protocol message - // was received but only start the server on audio packets - if ( Protocol.IsProtocolMessage ( vecbyData, iNumBytes ) ) - { - // set status flags - eRet = PS_PROT_OK_MESS_NOT_EVALUATED; - bIsProtocolPacket = true; - } - } + // This seems to be an audio packet (only try to parse audio if it + // was not a protocol packet): - // only try to parse audio if it was not a protocol packet - if ( !bIsProtocolPacket ) - { Mutex.lock(); { // only process audio if packet has correct size @@ -476,12 +546,12 @@ EPutDataStat CChannel::PutData ( const CVector& vecbyData, eRet = PS_PROT_ERR; } - // all network packets except of valid protocol messages + // All network packets except of valid protocol messages // regardless if they are valid or invalid audio packets lead to - // a state change to a connected channel - // this is because protocol messages can only be sent on a + // a state change to a connected channel. + // This is because protocol messages can only be sent on a // connected channel and the client has to inform the server - // about the audio packet properties via the protocol + // about the audio packet properties via the protocol. // check if channel was not connected, this is a new connection // (do not fire an event directly since we are inside a mutex @@ -506,55 +576,62 @@ EPutDataStat CChannel::PutData ( const CVector& vecbyData, EGetDataStat CChannel::GetData ( CVector& vecbyData ) { - QMutexLocker locker ( &Mutex ); - EGetDataStat eGetStatus; - const bool bSockBufState = SockBuf.Get ( vecbyData ); - - // decrease time-out counter - if ( iConTimeOut > 0 ) + Mutex.lock(); { - // subtract the number of samples of the current block since the - // time out counter is based on samples not on blocks (definition: - // always one atomic block is get by using the GetData() function - // where the atomic block size is "SYSTEM_FRAME_SIZE_SAMPLES") + // the socket access must be inside a mutex + const bool bSockBufState = SockBuf.Get ( vecbyData ); + + // decrease time-out counter + if ( iConTimeOut > 0 ) + { + // subtract the number of samples of the current block since the + // time out counter is based on samples not on blocks (definition: + // always one atomic block is get by using the GetData() function + // where the atomic block size is "SYSTEM_FRAME_SIZE_SAMPLES") // TODO this code only works with the above assumption -> better // implementation so that we are not depending on assumptions - iConTimeOut -= SYSTEM_FRAME_SIZE_SAMPLES; + iConTimeOut -= SYSTEM_FRAME_SIZE_SAMPLES; - if ( iConTimeOut <= 0 ) - { - // channel is just disconnected - eGetStatus = GS_CHAN_NOW_DISCONNECTED; - iConTimeOut = 0; // make sure we do not have negative values - - // reset network transport properties - ResetNetworkTransportProperties(); - - // emit message - emit Disconnected(); - } - else - { - if ( bSockBufState ) + if ( iConTimeOut <= 0 ) { - // everything is ok - eGetStatus = GS_BUFFER_OK; + // channel is just disconnected + eGetStatus = GS_CHAN_NOW_DISCONNECTED; + iConTimeOut = 0; // make sure we do not have negative values + + // reset network transport properties + ResetNetworkTransportProperties(); } else { - // channel is not yet disconnected but no data in buffer - eGetStatus = GS_BUFFER_UNDERRUN; + if ( bSockBufState ) + { + // everything is ok + eGetStatus = GS_BUFFER_OK; + } + else + { + // channel is not yet disconnected but no data in buffer + eGetStatus = GS_BUFFER_UNDERRUN; + } } } + else + { + // channel is disconnected + eGetStatus = GS_CHAN_NOT_CONNECTED; + } } - else + Mutex.unlock(); + + // in case we are just disconnected, we have to fire a message + if ( eGetStatus == GS_CHAN_NOW_DISCONNECTED ) { - // channel is disconnected - eGetStatus = GS_CHAN_NOT_CONNECTED; + // emit message + emit Disconnected(); } return eGetStatus; diff --git a/src/channel.h b/src/channel.h index 7abaf865..a2f9bf10 100755 --- a/src/channel.h +++ b/src/channel.h @@ -150,7 +150,7 @@ void CreateConClientListNameMes ( const CVector& vecChanInfo ) void CreateConClientListMes ( const CVector& vecChanInfo ) { Protocol.CreateConClientListMes ( vecChanInfo ); } - void CreateNetTranspPropsMessFromCurrentSettings(); + CNetworkTransportProps GetNetworkTransportPropsFromCurrentSettings(); protected: bool ProtocolIsEnabled(); @@ -209,6 +209,12 @@ public slots: void OnNetTranspPropsReceived ( CNetworkTransportProps NetworkTransportProps ); void OnReqNetTranspProps(); +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + void OnParseMessageBody ( CVector vecbyMesBodyData, + int iRecCounter, + int iRecID ) { Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); } +#endif + signals: void MessReadyForSending ( CVector vecMessage ); void NewConnection(); @@ -225,8 +231,15 @@ signals: void PingReceived ( int iMs ); void ReqNetTranspProps(); void Disconnected(); - void DetectedCLMessage ( CVector vecbyData, - int iNumBytes ); + + void DetectedCLMessage ( CVector vecbyMesBodyData, + int iRecID ); + +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + void ParseMessageBody ( CVector vecbyMesBodyData, + int iRecCounter, + int iRecID ); +#endif }; #endif /* !defined ( CHANNEL_HOIH9345KJH98_3_4344_BB23945IUHF1912__INCLUDED_ ) */ diff --git a/src/client.cpp b/src/client.cpp index 30774e61..d05e3386 100755 --- a/src/client.cpp +++ b/src/client.cpp @@ -225,29 +225,29 @@ void CClient::OnInvalidPacketReceived ( CVector vecbyRecBuf, // e.g., open the connection setup dialog since then we are not // yet connected but talk to the central server with the // connection less protocol) - if ( ConnLessProtocol.ParseConnectionLessMessage ( vecbyRecBuf, - iNumBytesRead, - RecHostAddr ) ) + if ( ConnLessProtocol.ParseConnectionLessMessageWithFrame ( vecbyRecBuf, + iNumBytesRead, + RecHostAddr ) ) { // message coult not be parsed, check if the packet comes // from the server we just connected -> if yes, send // disconnect message since the server may not know that we // are not connected anymore - if ( Channel. GetAddress() == RecHostAddr ) + if ( Channel.GetAddress() == RecHostAddr ) { ConnLessProtocol.CreateCLDisconnection ( RecHostAddr ); } } } -void CClient::OnDetectedCLMessage ( CVector vecbyData, - int iNumBytes ) +void CClient::OnDetectedCLMessage ( CVector vecbyMesBodyData, + int iRecID ) { // this is a special case: we received a connection less message but we are // in a connection - ConnLessProtocol.ParseConnectionLessMessage ( vecbyData, - iNumBytes, - Channel.GetAddress() ); + ConnLessProtocol.ParseConnectionLessMessageBody ( vecbyMesBodyData, + iRecID, + Channel.GetAddress() ); } void CClient::OnJittBufSizeChanged ( int iNewJitBufSize ) diff --git a/src/client.h b/src/client.h index 9e97b562..acb8806a 100755 --- a/src/client.h +++ b/src/client.h @@ -301,7 +301,11 @@ void SetAudoCompressiontype ( const EAudComprType eNAudCompressionType ); bool bIsInitializationPhase; CVector vecCeltData; +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + CHighPrioSocket Socket; +#else CSocket Socket; +#endif CSound Sound; CStereoSignalLevelMeter SignalLevelMeter; @@ -350,7 +354,7 @@ public slots: void OnInvalidPacketReceived ( CVector vecbyRecBuf, int iNumBytesRead, CHostAddress RecHostAddr ); - void OnDetectedCLMessage ( CVector vecbyData, int iNumBytes ); + void OnDetectedCLMessage ( CVector vecbyMesBodyData, int iRecID ); void OnReqJittBufSize() { CreateServerJitterBufferMessage(); } void OnJittBufSizeChanged ( int iNewJitBufSize ); void OnReqChanInfo() { Channel.SetRemoteInfo ( ChannelInfo ); } diff --git a/src/clientdlg.cpp b/src/clientdlg.cpp index ec963848..63e91258 100755 --- a/src/clientdlg.cpp +++ b/src/clientdlg.cpp @@ -324,17 +324,8 @@ CClientDlg::CClientDlg ( CClient* pNCliP, // View menu -------------------------------------------------------------- pViewMenu = new QMenu ( "&View", this ); - -// TODO This View menu item is disabled for now. The reason for that is that if -// if the connection setup dialog is opened during an active connection, we will -// get audio drop outs everytime the ping times in the list are updated which is -// not acceptable. -// Fix the issue with the thread priorities and enable the menu item again... -/* pViewMenu->addAction ( tr ( "&Connection Setup..." ), this, SLOT ( OnOpenConnectionSetupDialog() ) ); -*/ - pViewMenu->addAction ( tr ( "C&hat..." ), this, SLOT ( OnOpenChatDialog() ) ); diff --git a/src/global.h b/src/global.h index 513a87ef..ae205dca 100755 --- a/src/global.h +++ b/src/global.h @@ -65,6 +65,24 @@ LED bar: lbr /* Definitions ****************************************************************/ + + + +// The receive socket should be put in a high priority thread to ensure the GUI +// does not effect the stability of the audio stream (e.g. if the GUI is on +// high load because of a table update, the incoming network packets must still +// be put in the jitter buffer with highest priority). +// Unfortunately, the current implementation of the separate thread does not +// work correctly. Therefore we introduced this macro to enable/disable the new +// code. If the new code is enabled, under Windows Qt warnings like +// "QSocketNotifier: Multiple socket notifiers for same socket" appear and after +// a while the receive socket does not receive any new network packets anymore. +// This has to be solved! +//#define ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD +#undef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + + + // define this macro to get debug output //#define _DEBUG_ #undef _DEBUG_ diff --git a/src/main.cpp b/src/main.cpp index c0e666f0..5bb69110 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -372,9 +372,17 @@ int main ( int argc, char** argv ) QApplication app ( argc, argv, bUseGUI ); #ifdef _WIN32 - // Set application priority class -> high priority + // set application priority class -> high priority SetPriorityClass ( GetCurrentProcess(), HIGH_PRIORITY_CLASS ); +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + // Since we have a higher priority for the entire application, we can give + // the GUI thread the lowest possible priority which is still above the + // Windows "normal" priority so that it does not slow down the sound card + // and network working threads. + QThread::currentThread()->setPriority ( QThread::LowestPriority ); +#endif + // For accessible support we need to add a plugin to qt. The plugin has to // be located in the install directory of the software by the installer. // Here, we set the path to our application path. diff --git a/src/protocol.cpp b/src/protocol.cpp index 01847328..5d164fdb 100755 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -425,176 +425,168 @@ void CProtocol::CreateAndImmSendConLessMessage ( const int iID, emit CLMessReadyForSending ( InetAddr, vecNewMessage ); } -bool CProtocol::IsProtocolMessage ( const CVector& vecbyData, - const int iNumBytes ) -{ -/* - just check if this is a protocol message but do not act on message -*/ - int iRecCounter, iRecID; - CVector vecData; - - return !ParseMessageFrame ( vecbyData, - iNumBytes, - iRecCounter, - iRecID, - vecData ); -} - -bool CProtocol::ParseMessage ( const CVector& vecbyData, - const int iNumBytes ) +bool CProtocol::ParseMessageBody ( const CVector& vecbyMesBodyData, + const int iRecCounter, + const int iRecID ) { /* return code: false -> ok; true -> error */ - bool bRet = false; - bool bSendNextMess; - int iRecCounter, iRecID; - CVector vecData; - - if ( !ParseMessageFrame ( vecbyData, iNumBytes, iRecCounter, iRecID, vecData ) ) - { + bool bRet = false; + bool bSendNextMess; /* // TEST channel implementation: randomly delete protocol messages (50 % loss) if ( rand() < ( RAND_MAX / 2 ) ) return false; */ - // in case this is a connection less message, we do not process it here - if ( IsConnectionLessMessageID ( iRecID ) ) + // In case we received a message and returned an answer but our answer + // did not make it to the receiver, he will resend his message. We check + // here if the message is the same as the old one, and if this is the + // case, just resend our old answer again + if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) ) + { + // acknowledgments are not acknowledged + if ( iRecID != PROTMESSID_ACKN ) { - // fire a signal so that an other class can process this type of - // message - emit DetectedCLMessage ( vecbyData, iNumBytes ); - - // return function without issuing an error code (since it is a - // regular message but will just not processed here) - return false; + // resend acknowledgement + CreateAndImmSendAcknMess ( iRecID, iRecCounter ); } - - // In case we received a message and returned an answer but our answer - // did not make it to the receiver, he will resend his message. We check - // here if the message is the same as the old one, and if this is the - // case, just resend our old answer again - if ( ( iOldRecID == iRecID ) && ( iOldRecCnt == iRecCounter ) ) + } + else + { + // special treatment for acknowledge messages + if ( iRecID == PROTMESSID_ACKN ) { - // acknowledgments are not acknowledged - if ( iRecID != PROTMESSID_ACKN ) + // extract data from stream and emit signal for received value + int iPos = 0; + const int iData = + static_cast ( GetValFromStream ( vecbyMesBodyData, iPos, 2 ) ); + + Mutex.lock(); { - // resend acknowledgement - CreateAndImmSendAcknMess ( iRecID, iRecCounter ); + // check if this is the correct acknowledgment + bSendNextMess = false; + if ( !SendMessQueue.empty() ) + { + if ( ( SendMessQueue.front().iCnt == iRecCounter ) && + ( SendMessQueue.front().iID == iData ) ) + { + // message acknowledged, remove from queue + SendMessQueue.pop_front(); + + // send next message in queue + bSendNextMess = true; + } + } + } + Mutex.unlock(); + + if ( bSendNextMess ) + { + SendMessage(); } } else { - // special treatment for acknowledge messages - if ( iRecID == PROTMESSID_ACKN ) + // check which type of message we received and do action + switch ( iRecID ) { - // extract data from stream and emit signal for received value - int iPos = 0; - const int iData = - static_cast ( GetValFromStream ( vecData, iPos, 2 ) ); + case PROTMESSID_JITT_BUF_SIZE: + bRet = EvaluateJitBufMes ( vecbyMesBodyData ); + break; - Mutex.lock(); - { - // check if this is the correct acknowledgment - bSendNextMess = false; - if ( !SendMessQueue.empty() ) - { - if ( ( SendMessQueue.front().iCnt == iRecCounter ) && - ( SendMessQueue.front().iID == iData ) ) - { - // message acknowledged, remove from queue - SendMessQueue.pop_front(); + case PROTMESSID_REQ_JITT_BUF_SIZE: + bRet = EvaluateReqJitBufMes(); + break; - // send next message in queue - bSendNextMess = true; - } - } - } - Mutex.unlock(); - - if ( bSendNextMess ) - { - SendMessage(); - } - } - else - { - // check which type of message we received and do action - switch ( iRecID ) - { - case PROTMESSID_JITT_BUF_SIZE: - bRet = EvaluateJitBufMes ( vecData ); - break; - - case PROTMESSID_REQ_JITT_BUF_SIZE: - bRet = EvaluateReqJitBufMes(); - break; - - case PROTMESSID_CHANNEL_GAIN: - bRet = EvaluateChanGainMes ( vecData ); - break; + case PROTMESSID_CHANNEL_GAIN: + bRet = EvaluateChanGainMes ( vecbyMesBodyData ); + break; // #### COMPATIBILITY OLD VERSION, TO BE REMOVED #### case PROTMESSID_CONN_CLIENTS_LIST_NAME: - bRet = EvaluateConClientListNameMes ( vecData ); + bRet = EvaluateConClientListNameMes ( vecbyMesBodyData ); break; - case PROTMESSID_CONN_CLIENTS_LIST: - bRet = EvaluateConClientListMes ( vecData ); - break; + case PROTMESSID_CONN_CLIENTS_LIST: + bRet = EvaluateConClientListMes ( vecbyMesBodyData ); + break; - case PROTMESSID_REQ_CONN_CLIENTS_LIST: - bRet = EvaluateReqConnClientsList(); - break; + case PROTMESSID_REQ_CONN_CLIENTS_LIST: + bRet = EvaluateReqConnClientsList(); + break; // #### COMPATIBILITY OLD VERSION, TO BE REMOVED #### case PROTMESSID_CHANNEL_NAME: - bRet = EvaluateChanNameMes ( vecData ); + bRet = EvaluateChanNameMes ( vecbyMesBodyData ); break; - case PROTMESSID_CHANNEL_INFOS: - bRet = EvaluateChanInfoMes ( vecData ); - break; + case PROTMESSID_CHANNEL_INFOS: + bRet = EvaluateChanInfoMes ( vecbyMesBodyData ); + break; - case PROTMESSID_REQ_CHANNEL_INFOS: - bRet = EvaluateReqChanInfoMes(); - break; + case PROTMESSID_REQ_CHANNEL_INFOS: + bRet = EvaluateReqChanInfoMes(); + break; - case PROTMESSID_CHAT_TEXT: - bRet = EvaluateChatTextMes ( vecData ); - break; + case PROTMESSID_CHAT_TEXT: + bRet = EvaluateChatTextMes ( vecbyMesBodyData ); + break; // #### COMPATIBILITY OLD VERSION, TO BE REMOVED #### case PROTMESSID_PING_MS: - bRet = EvaluatePingMes ( vecData ); + bRet = EvaluatePingMes ( vecbyMesBodyData ); break; - case PROTMESSID_NETW_TRANSPORT_PROPS: - bRet = EvaluateNetwTranspPropsMes ( vecData ); - break; + case PROTMESSID_NETW_TRANSPORT_PROPS: + bRet = EvaluateNetwTranspPropsMes ( vecbyMesBodyData ); + break; - case PROTMESSID_REQ_NETW_TRANSPORT_PROPS: - bRet = EvaluateReqNetwTranspPropsMes(); - break; + case PROTMESSID_REQ_NETW_TRANSPORT_PROPS: + bRet = EvaluateReqNetwTranspPropsMes(); + break; // #### COMPATIBILITY OLD VERSION, TO BE REMOVED #### case PROTMESSID_OPUS_SUPPORTED: bRet = EvaluateOpusSupportedMes(); break; - } - - // immediately send acknowledge message - CreateAndImmSendAcknMess ( iRecID, iRecCounter ); - - // save current message ID and counter to find out if message - // was resent - iOldRecID = iRecID; - iOldRecCnt = iRecCounter; } + + // immediately send acknowledge message + CreateAndImmSendAcknMess ( iRecID, iRecCounter ); + + // save current message ID and counter to find out if message + // was resent + iOldRecID = iRecID; + iOldRecCnt = iRecCounter; } } + + return bRet; +} + +bool CProtocol::ParseConnectionLessMessageWithFrame ( const CVector& vecbyData, + const int iNumBytesIn, + const CHostAddress& InetAddr ) +{ +/* + return code: false -> ok; true -> error +*/ + bool bRet = false; + int iRecCounter, iRecID; + CVector vecbyMesBodyData; + + if ( !ParseMessageFrame ( vecbyData, + iNumBytesIn, + vecbyMesBodyData, + iRecCounter, + iRecID ) ) + { + bRet = ParseConnectionLessMessageBody ( vecbyMesBodyData, + iRecID, + InetAddr ); + } else { bRet = true; // return error code @@ -603,70 +595,60 @@ case PROTMESSID_OPUS_SUPPORTED: return bRet; } -bool CProtocol::ParseConnectionLessMessage ( const CVector& vecbyData, - const int iNumBytes, - const CHostAddress& InetAddr ) +bool CProtocol::ParseConnectionLessMessageBody ( const CVector& vecbyMesBodyData, + const int iRecID, + const CHostAddress& InetAddr ) { /* return code: false -> ok; true -> error */ - bool bRet = false; - int iRecCounter, iRecID; - CVector vecData; - - if ( !ParseMessageFrame ( vecbyData, iNumBytes, iRecCounter, iRecID, vecData ) ) - { + bool bRet = false; /* // TEST channel implementation: randomly delete protocol messages (50 % loss) if ( rand() < ( RAND_MAX / 2 ) ) return false; */ - if ( IsConnectionLessMessageID ( iRecID ) ) + if ( IsConnectionLessMessageID ( iRecID ) ) + { + // check which type of message we received and do action + switch ( iRecID ) { - // check which type of message we received and do action - switch ( iRecID ) - { - case PROTMESSID_CLM_PING_MS: - bRet = EvaluateCLPingMes ( InetAddr, vecData ); - break; + case PROTMESSID_CLM_PING_MS: + bRet = EvaluateCLPingMes ( InetAddr, vecbyMesBodyData ); + break; - case PROTMESSID_CLM_PING_MS_WITHNUMCLIENTS: - bRet = EvaluateCLPingWithNumClientsMes ( InetAddr, vecData ); - break; + case PROTMESSID_CLM_PING_MS_WITHNUMCLIENTS: + bRet = EvaluateCLPingWithNumClientsMes ( InetAddr, vecbyMesBodyData ); + break; - case PROTMESSID_CLM_SERVER_FULL: - bRet = EvaluateCLServerFullMes(); - break; + case PROTMESSID_CLM_SERVER_FULL: + bRet = EvaluateCLServerFullMes(); + break; - case PROTMESSID_CLM_SERVER_LIST: - bRet = EvaluateCLServerListMes ( InetAddr, vecData ); - break; + case PROTMESSID_CLM_SERVER_LIST: + bRet = EvaluateCLServerListMes ( InetAddr, vecbyMesBodyData ); + break; - case PROTMESSID_CLM_REQ_SERVER_LIST: - bRet = EvaluateCLReqServerListMes ( InetAddr ); - break; + case PROTMESSID_CLM_REQ_SERVER_LIST: + bRet = EvaluateCLReqServerListMes ( InetAddr ); + break; - case PROTMESSID_CLM_SEND_EMPTY_MESSAGE: - bRet = EvaluateCLSendEmptyMesMes ( vecData ); - break; + case PROTMESSID_CLM_SEND_EMPTY_MESSAGE: + bRet = EvaluateCLSendEmptyMesMes ( vecbyMesBodyData ); + break; - case PROTMESSID_CLM_REGISTER_SERVER: - bRet = EvaluateCLRegisterServerMes ( InetAddr, vecData ); - break; + case PROTMESSID_CLM_REGISTER_SERVER: + bRet = EvaluateCLRegisterServerMes ( InetAddr, vecbyMesBodyData ); + break; - case PROTMESSID_CLM_UNREGISTER_SERVER: - bRet = EvaluateCLUnregisterServerMes ( InetAddr ); - break; + case PROTMESSID_CLM_UNREGISTER_SERVER: + bRet = EvaluateCLUnregisterServerMes ( InetAddr ); + break; - case PROTMESSID_CLM_DISCONNECTION: - bRet = EvaluateCLDisconnectionMes ( InetAddr ); - break; - } - } - else - { - bRet = true; // return error code + case PROTMESSID_CLM_DISCONNECTION: + bRet = EvaluateCLDisconnectionMes ( InetAddr ); + break; } } else @@ -1872,11 +1854,11 @@ bool CProtocol::EvaluateCLDisconnectionMes ( const CHostAddress& InetAddr ) /******************************************************************************\ * Message generation and parsing * \******************************************************************************/ -bool CProtocol::ParseMessageFrame ( const CVector& vecIn, +bool CProtocol::ParseMessageFrame ( const CVector& vecbyData, const int iNumBytesIn, + CVector& vecbyMesBodyData, int& iCnt, - int& iID, - CVector& vecData ) + int& iID ) { int i; int iLenBy; @@ -1893,7 +1875,7 @@ bool CProtocol::ParseMessageFrame ( const CVector& vecIn, iCurPos = 0; // start from beginning // 2 bytes TAG - const int iTag = static_cast ( GetValFromStream ( vecIn, iCurPos, 2 ) ); + const int iTag = static_cast ( GetValFromStream ( vecbyData, iCurPos, 2 ) ); // check if tag is correct if ( iTag != 0 ) @@ -1902,13 +1884,13 @@ bool CProtocol::ParseMessageFrame ( const CVector& vecIn, } // 2 bytes ID - iID = static_cast ( GetValFromStream ( vecIn, iCurPos, 2 ) ); + iID = static_cast ( GetValFromStream ( vecbyData, iCurPos, 2 ) ); // 1 byte cnt - iCnt = static_cast ( GetValFromStream ( vecIn, iCurPos, 1 ) ); + iCnt = static_cast ( GetValFromStream ( vecbyData, iCurPos, 1 ) ); // 2 bytes length - iLenBy = static_cast ( GetValFromStream ( vecIn, iCurPos, 2 ) ); + iLenBy = static_cast ( GetValFromStream ( vecbyData, iCurPos, 2 ) ); // make sure the length is correct if ( iLenBy != iNumBytesIn - MESS_LEN_WITHOUT_DATA_BYTE ) @@ -1927,24 +1909,24 @@ bool CProtocol::ParseMessageFrame ( const CVector& vecIn, for ( i = 0; i < iLenCRCCalc; i++ ) { CRCObj.AddByte ( static_cast ( - GetValFromStream ( vecIn, iCurPos, 1 ) ) ); + GetValFromStream ( vecbyData, iCurPos, 1 ) ) ); } - if ( CRCObj.GetCRC () != GetValFromStream ( vecIn, iCurPos, 2 ) ) + if ( CRCObj.GetCRC () != GetValFromStream ( vecbyData, iCurPos, 2 ) ) { return true; // return error code } // Extract actual data ----------------------------------------------------- - vecData.Init ( iLenBy ); + vecbyMesBodyData.Init ( iLenBy ); iCurPos = MESS_HEADER_LENGTH_BYTE; // start from beginning of data for ( i = 0; i < iLenBy; i++ ) { - vecData[i] = static_cast ( - GetValFromStream ( vecIn, iCurPos, 1 ) ); + vecbyMesBodyData[i] = static_cast ( + GetValFromStream ( vecbyData, iCurPos, 1 ) ); } return false; // no error diff --git a/src/protocol.h b/src/protocol.h index 41e775d6..e3454539 100755 --- a/src/protocol.h +++ b/src/protocol.h @@ -118,15 +118,26 @@ public: void CreateCLEmptyMes ( const CHostAddress& InetAddr ); void CreateCLDisconnection ( const CHostAddress& InetAddr ); - bool ParseMessage ( const CVector& vecbyData, - const int iNumBytes ); + bool ParseMessageFrame ( const CVector& vecbyData, + const int iNumBytesIn, + CVector& vecbyMesBodyData, + int& iRecCounter, + int& iRecID ); - bool ParseConnectionLessMessage ( const CVector& vecbyData, - const int iNumBytes, - const CHostAddress& InetAddr ); + bool ParseMessageBody ( const CVector& vecbyMesBodyData, + const int iRecCounter, + const int iRecID ); - bool IsProtocolMessage ( const CVector& vecbyData, - const int iNumBytes ); + bool ParseConnectionLessMessageWithFrame ( const CVector& vecbyData, + const int iNumBytesIn, + const CHostAddress& InetAddr ); + + bool ParseConnectionLessMessageBody ( const CVector& vecbyMesBodyData, + const int iRecID, + const CHostAddress& InetAddr ); + + bool IsConnectionLessMessageID ( const int iID ) const + { return (iID >= 1000) & (iID < 2000); } // this function is public because we need it in the test bench void CreateAndImmSendAcknMess ( const int& iID, @@ -160,12 +171,6 @@ protected: const int iCnt, const int iID ); - bool ParseMessageFrame ( const CVector& vecIn, - const int iNumBytesIn, - int& iCnt, - int& iID, - CVector& vecData ); - void GenMessageFrame ( CVector& vecOut, const int iCnt, const int iID, @@ -189,9 +194,6 @@ protected: const int iMaxStringLen, QString& strOut ); - bool IsConnectionLessMessageID ( const int iID ) const - { return (iID >= 1000) & (iID < 2000); } - void SendMessage(); void CreateAndSendMessage ( const int iID, @@ -280,9 +282,6 @@ signals: void CLReqServerList ( CHostAddress InetAddr ); void CLSendEmptyMes ( CHostAddress TargetInetAddr ); void CLDisconnection ( CHostAddress InetAddr ); - - void DetectedCLMessage ( CVector vecbyData, - int iNumBytes ); }; #endif /* !defined ( PROTOCOL_H__3B123453_4344_BB2392354455IUHF1912__INCLUDED_ ) */ diff --git a/src/server.cpp b/src/server.cpp index 54c56400..327963bf 100755 --- a/src/server.cpp +++ b/src/server.cpp @@ -537,15 +537,15 @@ void CServer::OnSendCLProtMessage ( CHostAddress InetAddr, Socket.SendPacket ( vecMessage, InetAddr ); } -void CServer::OnDetCLMess ( const CVector& vecbyData, - const int iNumBytes, +void CServer::OnDetCLMess ( const CVector& vecbyMesBodyData, + const int iRecID, const CHostAddress& InetAddr ) { // this is a special case: we received a connection less message but we are // in a connection - ConnLessProtocol.ParseConnectionLessMessage ( vecbyData, - iNumBytes, - InetAddr ); + ConnLessProtocol.ParseConnectionLessMessageBody ( vecbyMesBodyData, + iRecID, + InetAddr ); } void CServer::OnCLDisconnection ( CHostAddress InetAddr ) @@ -1195,9 +1195,9 @@ bool CServer::PutData ( const CVector& vecbyRecBuf, { // this is a new client, we then first check if this is a connection // less message before we create a new official channel - if ( ConnLessProtocol.ParseConnectionLessMessage ( vecbyRecBuf, - iNumBytesRead, - HostAdr ) ) + if ( ConnLessProtocol.ParseConnectionLessMessageWithFrame ( vecbyRecBuf, + iNumBytesRead, + HostAdr ) ) { // a new client is calling, look for free channel iCurChanID = GetFreeChan(); diff --git a/src/server.h b/src/server.h index 3f351ad8..d60ebaa1 100755 --- a/src/server.h +++ b/src/server.h @@ -265,8 +265,8 @@ public slots: void OnNewConnection ( int iChID ); void OnSendCLProtMessage ( CHostAddress InetAddr, CVector vecMessage ); - void OnDetCLMess ( const CVector& vecbyData, - const int iNumBytes, + void OnDetCLMess ( const CVector& vecbyMesBodyData, + const int iRecID, const CHostAddress& InetAddr ); void OnCLPingReceived ( CHostAddress InetAddr, int iMs ) @@ -325,18 +325,18 @@ public slots: void OnSendProtMessCh10 ( CVector mess ) { OnSendProtMessage ( 10, mess ); } void OnSendProtMessCh11 ( CVector mess ) { OnSendProtMessage ( 11, mess ); } - void OnDetCLMessCh0 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[0].GetAddress() ); } - void OnDetCLMessCh1 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[1].GetAddress() ); } - void OnDetCLMessCh2 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[2].GetAddress() ); } - void OnDetCLMessCh3 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[3].GetAddress() ); } - void OnDetCLMessCh4 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[4].GetAddress() ); } - void OnDetCLMessCh5 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[5].GetAddress() ); } - void OnDetCLMessCh6 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[6].GetAddress() ); } - void OnDetCLMessCh7 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[7].GetAddress() ); } - void OnDetCLMessCh8 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[8].GetAddress() ); } - void OnDetCLMessCh9 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[9].GetAddress() ); } - void OnDetCLMessCh10 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[10].GetAddress() ); } - void OnDetCLMessCh11 ( CVector vData, int iNBy ) { OnDetCLMess ( vData, iNBy, vecChannels[11].GetAddress() ); } + void OnDetCLMessCh0 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[0].GetAddress() ); } + void OnDetCLMessCh1 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[1].GetAddress() ); } + void OnDetCLMessCh2 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[2].GetAddress() ); } + void OnDetCLMessCh3 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[3].GetAddress() ); } + void OnDetCLMessCh4 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[4].GetAddress() ); } + void OnDetCLMessCh5 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[5].GetAddress() ); } + void OnDetCLMessCh6 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[6].GetAddress() ); } + void OnDetCLMessCh7 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[7].GetAddress() ); } + void OnDetCLMessCh8 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[8].GetAddress() ); } + void OnDetCLMessCh9 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[9].GetAddress() ); } + void OnDetCLMessCh10 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[10].GetAddress() ); } + void OnDetCLMessCh11 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[11].GetAddress() ); } void OnNewConnectionCh0() { OnNewConnection ( 0 ); } void OnNewConnectionCh1() { OnNewConnection ( 1 ); } diff --git a/src/socket.h b/src/socket.h index d3c3d521..f26ae4d3 100755 --- a/src/socket.h +++ b/src/socket.h @@ -90,41 +90,58 @@ signals: }; +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD /* Socket which runs in a separate high priority thread ----------------------*/ - -/* -// TEST - -// http://qt-project.org/forums/viewthread/14393 -// http://qt-project.org/doc/qt-5.0/qtcore/qthread.html#Priority-enum -// http://qt-project.org/wiki/Threads_Events_QObjects - -class CHighPrioSocket +class CHighPrioSocket : public QObject { + Q_OBJECT + public: CHighPrioSocket ( CChannel* pNewChannel, const quint16 iPortNumber ) { - // TEST - worker = new CSocket ( pNewChannel, iPortNumber ); - worker->moveToThread(&workerThread); - workerThread.start(QThread::TimeCriticalPriority); +// TEST we have to register some classes to the Qt signal/slot mechanism +// since now we actually have thread crossings with the new code +qRegisterMetaType > ( "CVector" ); +qRegisterMetaType ( "CHostAddress" ); + +// TEST actual creation of the new thread, see internet references below: +// http://qt-project.org/forums/viewthread/14393 +// http://qt-project.org/doc/qt-5.0/qtcore/qthread.html#Priority-enum +// http://qt-project.org/wiki/Threads_Events_QObjects + +pSocket = new CSocket ( pNewChannel, iPortNumber ); +pSocket->moveToThread ( &NetworkWorkerThread ); +NetworkWorkerThread.start ( QThread::TimeCriticalPriority ); + + + QObject::connect ( pSocket, + SIGNAL ( InvalidPacketReceived ( CVector, int, CHostAddress ) ), + SIGNAL ( InvalidPacketReceived ( CVector, int, CHostAddress ) ) ); + } + + virtual ~CHighPrioSocket() + { + NetworkWorkerThread.exit(); } void SendPacket ( const CVector& vecbySendBuf, const CHostAddress& HostAddr ) { - worker->SendPacket ( vecbySendBuf, HostAddr ); + pSocket->SendPacket ( vecbySendBuf, HostAddr ); } protected: + QThread NetworkWorkerThread; + CSocket* pSocket; - // TEST - QThread workerThread; - CSocket* worker; +signals: + void InvalidPacketReceived ( CVector vecbyRecBuf, + int iNumBytesRead, + CHostAddress RecHostAddr ); }; -*/ +#endif #endif /* !defined ( SOCKET_HOIHGE76GEKJH98_3_4344_BB23945IUHF1912__INCLUDED_ ) */