From 2e5f7781ce53b962af45a5c990e97c1fc9f86532 Mon Sep 17 00:00:00 2001 From: Volker Fischer Date: Sun, 16 Feb 2014 08:12:07 +0000 Subject: [PATCH] finished implementation of a separate socket high priority thread --- ChangeLog | 3 + Jamulus.pro | 3 +- src/channel.cpp | 200 +++++++++----------------------- src/channel.h | 59 +++++----- src/server.h | 76 ++++--------- src/socket.cpp | 297 ++++++++++++++++++++++++------------------------ src/socket.h | 122 +++++++++----------- 7 files changed, 314 insertions(+), 446 deletions(-) diff --git a/ChangeLog b/ChangeLog index 70f99504..917ca43b 100644 --- a/ChangeLog +++ b/ChangeLog @@ -5,6 +5,9 @@ - store fader solo state +- improved stability of the audio stream by reducing audio drop outs (by + using a separate socket thread) + - bug fix: the fader level could not be changed if fader was on solo 3.3.3 diff --git a/Jamulus.pro b/Jamulus.pro index 76d6d923..16a112c5 100755 --- a/Jamulus.pro +++ b/Jamulus.pro @@ -36,7 +36,8 @@ win32 { LIBS += ole32.lib \ user32.lib \ advapi32.lib \ - winmm.lib + winmm.lib \ + ws2_32.lib } else:macx { HEADERS += mac/sound.h SOURCES += mac/sound.cpp diff --git a/src/channel.cpp b/src/channel.cpp index bd9e124a..265f7550 100755 --- a/src/channel.cpp +++ b/src/channel.cpp @@ -109,14 +109,6 @@ QObject::connect ( &Protocol, QObject::connect ( &Protocol, SIGNAL ( ReqNetTranspProps() ), this, SLOT ( OnReqNetTranspProps() ) ); - - // this connection is intended for a thread transition if we have a - // separate socket thread running -#ifndef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - QObject::connect ( this, - SIGNAL ( ParseMessageBody ( CVector, int, int ) ), - this, SLOT ( OnParseMessageBody ( CVector, int, int ) ) ); -#endif } bool CChannel::ProtocolIsEnabled() @@ -447,147 +439,82 @@ void CChannel::Disconnect() } } -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -EPutDataStat CChannel::PutData ( const CVector& vecbyData, - const int iNumBytes, - CSocket* pSocket ) -#else -EPutDataStat CChannel::PutData ( const CVector& vecbyData, - const int iNumBytes ) -#endif +void CChannel::PutProtcolData ( const int iRecCounter, + const int iRecID, + const CVector& vecbyMesBodyData, + const CHostAddress& RecHostAddr ) { -/* - Note that this function might be called from a different thread (separate - Socket thread) and therefore we should not call functions which emit signals - themself directly but emit a signal here so that the thread transition is - done as early as possible. - This is the reason why "ParseMessageBody" is not called directly but through a - signal-slot mechanism. -*/ + // Only process protocol message if: + // - for client only: the packet comes from the server we want to talk to + // - the channel is enabled + // - the protocol mechanism is enabled + if ( ( bIsServer || ( GetAddress() == RecHostAddr ) ) && + IsEnabled() && + ProtocolIsEnabled() ) + { + // parse the message assuming this is a regular protocol message + Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); + } +} +EPutDataStat CChannel::PutAudioData ( const CVector& vecbyData, + const int iNumBytes, + CHostAddress RecHostAddr ) +{ // init return state EPutDataStat eRet = PS_GEN_ERROR; - if ( bIsEnabled ) + // Only process audio data if: + // - for client only: the packet comes from the server we want to talk to + // - the channel is enabled + if ( ( bIsServer || ( GetAddress() == RecHostAddr ) ) && + IsEnabled() ) { - int iRecCounter; - int iRecID; - CVector vecbyMesBodyData; - - // init flag - bool bNewConnection = false; - - // check if this is a protocol message by trying to parse the message - // frame - if ( !Protocol.ParseMessageFrame ( vecbyData, - iNumBytes, - vecbyMesBodyData, - iRecCounter, - iRecID ) ) + MutexSocketBuf.lock(); { - // This is a protocol message: - - // only use protocol data if protocol mechanism is enabled - if ( ProtocolIsEnabled() ) + // only process audio if packet has correct size + if ( iNumBytes == ( iNetwFrameSize * iNetwFrameSizeFact ) ) { - // in case this is a connection less message, we do not process it here - if ( Protocol.IsConnectionLessMessageID ( iRecID ) ) + // store new packet in jitter buffer + if ( SockBuf.Put ( vecbyData, iNumBytes ) ) { - // fire a signal so that an other class can process this type of - // message -// TODO a copy of the vector is used -> avoid malloc in real-time routine -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - pSocket->EmitDetectedCLMessage ( vecbyMesBodyData, iRecID ); -#else - emit DetectedCLMessage ( vecbyMesBodyData, iRecID ); -#endif - - // set status flag - eRet = PS_PROT_OK; + eRet = PS_AUDIO_OK; } else { - // parse the message assuming this is a regular protocol message -// TODO a copy of the vector is used -> avoid malloc in real-time routine -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - pSocket->EmitParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); -#else - emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); -#endif - - // note that protocol OK is not correct here since we do not - // check if the protocol was ok since we emit just a signal - // and do not get any feedback on the protocol decoding state - eRet = PS_PROT_OK; + eRet = PS_AUDIO_ERR; } } else { - // In case we are the server and the current channel is not - // connected, we do not evaluate protocol messages but these - // messages could start the server which is not desired, - // especially not for the disconnect messages. - // We now do not start the server if a valid protocol message - // was received but only start the server on audio packets. - - // set status flag - eRet = PS_PROT_OK_MESS_NOT_EVALUATED; + // the protocol parsing failed and this was no audio block, + // we treat this as protocol error (unkown packet) + eRet = PS_PROT_ERR; } - } - else - { - // This seems to be an audio packet (only try to parse audio if it - // was not a protocol packet): - MutexSocketBuf.lock(); + // All network packets except of valid protocol messages + // regardless if they are valid or invalid audio packets lead to + // a state change to a connected channel. + // This is because protocol messages can only be sent on a + // connected channel and the client has to inform the server + // about the audio packet properties via the protocol. + + // check if channel was not connected, this is a new connection + if ( !IsConnected() ) { - // only process audio if packet has correct size - if ( iNumBytes == ( iNetwFrameSize * iNetwFrameSizeFact ) ) - { - // store new packet in jitter buffer - if ( SockBuf.Put ( vecbyData, iNumBytes ) ) - { - eRet = PS_AUDIO_OK; - } - else - { - eRet = PS_AUDIO_ERR; - } - } - else - { - // the protocol parsing failed and this was no audio block, - // we treat this as protocol error (unkown packet) - eRet = PS_PROT_ERR; - } - - // All network packets except of valid protocol messages - // regardless if they are valid or invalid audio packets lead to - // a state change to a connected channel. - // This is because protocol messages can only be sent on a - // connected channel and the client has to inform the server - // about the audio packet properties via the protocol. - - // check if channel was not connected, this is a new connection - // (do not fire an event directly since we are inside a mutex - // region -> to avoid a dead-lock) - bNewConnection = !IsConnected(); - - // reset time-out counter - ResetTimeOutCounter(); + // overwrite status + eRet = PS_NEW_CONNECTION; } - MutexSocketBuf.unlock(); - } - if ( bNewConnection ) - { - // inform other objects that new connection was established -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - pSocket->EmitNewConnection(); -#else - emit NewConnection(); -#endif + // reset time-out counter (note that this must be done after the + // "IsConnected()" query above) + ResetTimeOutCounter(); } + MutexSocketBuf.unlock(); + } + else + { + eRet = PS_AUDIO_INVALID; } return eRet; @@ -657,24 +584,7 @@ EGetDataStat CChannel::GetData ( CVector& vecbyData, return eGetStatus; } -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -void CChannel::PrepAndSendPacketHPS ( CHighPrioSocket* pSocket, - const CVector& vecbyNPacket, - const int iNPacketLen ) -{ -// TODO Doubled code!!! Same code as in PrepAndSendPacket (see below)!!! - QMutexLocker locker ( &MutexConvBuf ); - - // use conversion buffer to convert sound card block size in network - // block size - if ( ConvBuf.Put ( vecbyNPacket, iNPacketLen ) ) - { - pSocket->SendPacket ( ConvBuf.Get(), GetAddress() ); - } -} -#endif - -void CChannel::PrepAndSendPacket ( CSocket* pSocket, +void CChannel::PrepAndSendPacket ( CHighPrioSocket* pSocket, const CVector& vecbyNPacket, const int iNPacketLen ) { diff --git a/src/channel.h b/src/channel.h index 4260149c..3a98a5eb 100755 --- a/src/channel.h +++ b/src/channel.h @@ -47,9 +47,11 @@ enum EPutDataStat PS_GEN_ERROR, PS_AUDIO_OK, PS_AUDIO_ERR, + PS_AUDIO_INVALID, PS_PROT_OK, PS_PROT_OK_MESS_NOT_EVALUATED, - PS_PROT_ERR + PS_PROT_ERR, + PS_NEW_CONNECTION }; @@ -63,28 +65,22 @@ public: // use constructor initialization in the server for a vector of channels CChannel ( const bool bNIsServer = true ); -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - EPutDataStat PutData ( const CVector& vecbyData, - const int iNumBytes, - CSocket* pSocket = 0 ); // TODO remove the "= 0"! -#else - EPutDataStat PutData ( const CVector& vecbyData, - const int iNumBytes ); -#endif + void PutProtcolData ( const int iRecCounter, + const int iRecID, + const CVector& vecbyMesBodyData, + const CHostAddress& RecHostAddr ); + + EPutDataStat PutAudioData ( const CVector& vecbyData, + const int iNumBytes, + CHostAddress RecHostAddr ); + EGetDataStat GetData ( CVector& vecbyData, const int iNumBytes ); -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - void PrepAndSendPacketHPS ( CHighPrioSocket* pSocket, - const CVector& vecbyNPacket, - const int iNPacketLen ); -#endif - - void PrepAndSendPacket ( CSocket* pSocket, + void PrepAndSendPacket ( CHighPrioSocket* pSocket, const CVector& vecbyNPacket, const int iNPacketLen ); - void ResetTimeOutCounter() { iConTimeOut = iConTimeOutStartVal; } bool IsConnected() const { return iConTimeOut > 0; } void Disconnect(); @@ -235,18 +231,22 @@ public slots: Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); } -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -void OnDetectedCLMessage ( CVector vecbyMesBodyData, - int iRecID ) -{ - emit DetectedCLMessage ( vecbyMesBodyData, iRecID ); -} + void OnProtcolMessageReceived ( int iRecCounter, + int iRecID, + CVector vecbyMesBodyData, + CHostAddress RecHostAddr ) + { + PutProtcolData ( iRecCounter, iRecID, vecbyMesBodyData, RecHostAddr ); + } -void OnNewConnection() -{ - emit NewConnection(); -} -#endif + void OnProtcolCLMessageReceived ( int iRecID, + CVector vecbyMesBodyData, + CHostAddress RecHostAddr ) + { + emit DetectedCLMessage ( vecbyMesBodyData, iRecID, RecHostAddr ); + } + + void OnNewConnection() { emit NewConnection(); } signals: void MessReadyForSending ( CVector vecMessage ); @@ -265,7 +265,8 @@ signals: void Disconnected(); void DetectedCLMessage ( CVector vecbyMesBodyData, - int iRecID ); + int iRecID, + CHostAddress RecHostAddr ); void ParseMessageBody ( CVector vecbyMesBodyData, int iRecCounter, diff --git a/src/server.h b/src/server.h index 98e87ab3..9b7a9192 100755 --- a/src/server.h +++ b/src/server.h @@ -130,9 +130,10 @@ public: void Stop(); bool IsRunning() { return HighPrecisionTimer.isActive(); } - bool PutData ( const CVector& vecbyRecBuf, - const int iNumBytesRead, - const CHostAddress& HostAdr ); + bool PutAudioData ( const CVector& vecbyRecBuf, + const int iNumBytesRead, + const CHostAddress& HostAdr, + int& iCurChanID ); void GetConCliParam ( CVector& vecHostAddresses, CVector& vecsName, @@ -242,7 +243,7 @@ protected: CVector vecbyCodedData; // actual working objects - CSocket Socket; + CHighPrioSocket Socket; // logging CServerLogging Logging; @@ -269,13 +270,26 @@ signals: public slots: void OnTimer(); - void OnSendProtMessage ( int iChID, CVector vecMessage ); - void OnNewConnection ( int iChID ); - void OnSendCLProtMessage ( CHostAddress InetAddr, CVector vecMessage ); - void OnDetCLMess ( const CVector& vecbyMesBodyData, - const int iRecID, - const CHostAddress& InetAddr ); + void OnSendProtMessage ( int iChID, + CVector vecMessage ); + + void OnNewConnection ( int iChID, + CHostAddress RecHostAddr ); + + void OnServerFull ( CHostAddress RecHostAddr ); + + void OnSendCLProtMessage ( CHostAddress InetAddr, + CVector vecMessage ); + + void OnProtcolCLMessageReceived ( int iRecID, + CVector vecbyMesBodyData, + CHostAddress RecHostAddr ); + + void OnProtcolMessageReceived ( int iRecCounter, + int iRecID, + CVector vecbyMesBodyData, + CHostAddress RecHostAddr ); void OnCLPingReceived ( CHostAddress InetAddr, int iMs ) { ConnLessProtocol.CreateCLPingMes ( InetAddr, iMs ); } @@ -341,48 +355,6 @@ public slots: void OnSendProtMessCh18 ( CVector mess ) { OnSendProtMessage ( 18, mess ); } void OnSendProtMessCh19 ( CVector mess ) { OnSendProtMessage ( 19, mess ); } - void OnDetCLMessCh0 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[0].GetAddress() ); } - void OnDetCLMessCh1 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[1].GetAddress() ); } - void OnDetCLMessCh2 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[2].GetAddress() ); } - void OnDetCLMessCh3 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[3].GetAddress() ); } - void OnDetCLMessCh4 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[4].GetAddress() ); } - void OnDetCLMessCh5 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[5].GetAddress() ); } - void OnDetCLMessCh6 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[6].GetAddress() ); } - void OnDetCLMessCh7 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[7].GetAddress() ); } - void OnDetCLMessCh8 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[8].GetAddress() ); } - void OnDetCLMessCh9 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[9].GetAddress() ); } - void OnDetCLMessCh10 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[10].GetAddress() ); } - void OnDetCLMessCh11 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[11].GetAddress() ); } - void OnDetCLMessCh12 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[12].GetAddress() ); } - void OnDetCLMessCh13 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[13].GetAddress() ); } - void OnDetCLMessCh14 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[14].GetAddress() ); } - void OnDetCLMessCh15 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[15].GetAddress() ); } - void OnDetCLMessCh16 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[16].GetAddress() ); } - void OnDetCLMessCh17 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[17].GetAddress() ); } - void OnDetCLMessCh18 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[18].GetAddress() ); } - void OnDetCLMessCh19 ( CVector vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[19].GetAddress() ); } - - void OnNewConnectionCh0() { OnNewConnection ( 0 ); } - void OnNewConnectionCh1() { OnNewConnection ( 1 ); } - void OnNewConnectionCh2() { OnNewConnection ( 2 ); } - void OnNewConnectionCh3() { OnNewConnection ( 3 ); } - void OnNewConnectionCh4() { OnNewConnection ( 4 ); } - void OnNewConnectionCh5() { OnNewConnection ( 5 ); } - void OnNewConnectionCh6() { OnNewConnection ( 6 ); } - void OnNewConnectionCh7() { OnNewConnection ( 7 ); } - void OnNewConnectionCh8() { OnNewConnection ( 8 ); } - void OnNewConnectionCh9() { OnNewConnection ( 9 ); } - void OnNewConnectionCh10() { OnNewConnection ( 10 ); } - void OnNewConnectionCh11() { OnNewConnection ( 11 ); } - void OnNewConnectionCh12() { OnNewConnection ( 12 ); } - void OnNewConnectionCh13() { OnNewConnection ( 13 ); } - void OnNewConnectionCh14() { OnNewConnection ( 14 ); } - void OnNewConnectionCh15() { OnNewConnection ( 15 ); } - void OnNewConnectionCh16() { OnNewConnection ( 16 ); } - void OnNewConnectionCh17() { OnNewConnection ( 17 ); } - void OnNewConnectionCh18() { OnNewConnection ( 18 ); } - void OnNewConnectionCh19() { OnNewConnection ( 19 ); } - void OnReqConnClientsListCh0() { CreateAndSendChanListForThisChan ( 0 ); } void OnReqConnClientsListCh1() { CreateAndSendChanListForThisChan ( 1 ); } void OnReqConnClientsListCh2() { CreateAndSendChanListForThisChan ( 2 ); } diff --git a/src/socket.cpp b/src/socket.cpp index 142b18e2..963d5fff 100755 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -29,20 +29,26 @@ /* Implementation *************************************************************/ void CSocket::Init ( const quint16 iPortNumber ) { -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -# ifdef _WIN32 +#ifdef _WIN32 // for the Windows socket usage we have to start it up first + +// TODO check for error and exit application on error + WSADATA wsa; - WSAStartup ( MAKEWORD(1, 0), &wsa ); // TODO check for error and exit application on error -# endif + WSAStartup ( MAKEWORD(1, 0), &wsa ); +#endif // create the UDP socket UdpSocket = socket ( AF_INET, SOCK_DGRAM, 0 ); -#endif // allocate memory for network receive and send buffer in samples vecbyRecBuf.Init ( MAX_SIZE_BYTES_NETW_BUF ); + // preinitialize socket in address (only the port number is missing) + sockaddr_in UdpSocketInAddr; + UdpSocketInAddr.sin_family = AF_INET; + UdpSocketInAddr.sin_addr.s_addr = INADDR_ANY; + // initialize the listening socket bool bSuccess; @@ -55,12 +61,6 @@ void CSocket::Init ( const quint16 iPortNumber ) quint16 iClientPortIncrement = 10; // start value: port nubmer plus ten bSuccess = false; // initialization for while loop -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - // preinitialize socket in address (only the port number is missing) - sockaddr_in UdpSocketInAddr; - UdpSocketInAddr.sin_family = AF_INET; - UdpSocketInAddr.sin_addr.s_addr = INADDR_ANY; - while ( !bSuccess && ( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) ) { @@ -72,25 +72,17 @@ void CSocket::Init ( const quint16 iPortNumber ) iClientPortIncrement++; } -#else - while ( !bSuccess && - ( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) ) - { - bSuccess = SocketDevice.bind ( - QHostAddress ( QHostAddress::Any ), - iPortNumber + iClientPortIncrement ); - - iClientPortIncrement++; - } -#endif } else { // for the server, only try the given port number and do not try out // other port numbers to bind since it is imporatant that the server // gets the desired port number - bSuccess = SocketDevice.bind ( - QHostAddress ( QHostAddress::Any ), iPortNumber ); + UdpSocketInAddr.sin_port = htons ( iPortNumber ); + + bSuccess = ( bind ( UdpSocket , + (sockaddr*) &UdpSocketInAddr, + sizeof ( sockaddr_in ) ) == 0 ); } if ( !bSuccess ) @@ -100,67 +92,66 @@ void CSocket::Init ( const quint16 iPortNumber ) "the software is already running).", "Network Error" ); } - // connect the "activated" signal -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + + // Connections ------------------------------------------------------------- + // it is important to do the following connections in this class since we + // have a thread transition + + // we have different connections for client and server if ( bIsClient ) { -// TEST We do a test where we call "waitForReadyRead" instead of even driven method. -/* - // We have to use a blocked queued connection since in case we use a - // separate socket thread, the "readyRead" signal would occur and our - // "OnDataReceived" function would be run in another thread. This could - // lead to a situation that a new "readRead" occurs while the processing - // of the previous signal was not finished -> the error: "Multiple - // socket notifiers for same socket" may occur. - QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ), - this, SLOT ( OnDataReceived() ), Qt::BlockingQueuedConnection ); -*/ + // client connections: + QObject::connect ( this, + SIGNAL ( ProtcolMessageReceived ( int, int, CVector, CHostAddress ) ), + pChannel, SLOT ( OnProtcolMessageReceived ( int, int, CVector, CHostAddress ) ) ); -// TEST -QObject::connect ( this, - SIGNAL ( ParseMessageBody ( CVector, int, int ) ), - pChannel, SLOT ( OnParseMessageBody ( CVector, int, int ) ) ); - -QObject::connect ( this, - SIGNAL ( DetectedCLMessage ( CVector, int ) ), - pChannel, SLOT ( OnDetectedCLMessage ( CVector, int ) ) ); - -QObject::connect ( this, - SIGNAL ( NewConnection ( CVector, int ) ), - pChannel, SLOT ( OnNewConnection ( CVector, int ) ) ); + QObject::connect ( this, + SIGNAL ( ProtcolCLMessageReceived ( int, CVector, CHostAddress ) ), + pChannel, SLOT ( OnProtcolCLMessageReceived ( int, CVector, CHostAddress ) ) ); + QObject::connect ( this, + SIGNAL ( NewConnection() ), + pChannel, SLOT ( OnNewConnection() ) ); } else { - // the server does not use a separate socket thread right now, in that - // case we must not use the blocking queued connection, otherwise we - // would get a dead lock - QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ), - this, SLOT ( OnDataReceived() ) ); + // server connections: + + QObject::connect ( this, + SIGNAL ( ProtcolMessageReceived ( int, int, CVector, CHostAddress ) ), + pServer, SLOT ( OnProtcolMessageReceived ( int, int, CVector, CHostAddress ) ) ); + + QObject::connect ( this, + SIGNAL ( ProtcolCLMessageReceived ( int, CVector, CHostAddress ) ), + pServer, SLOT ( OnProtcolCLMessageReceived ( int, CVector, CHostAddress ) ) ); + + QObject::connect ( this, + SIGNAL ( NewConnection ( int, CHostAddress ) ), + pServer, SLOT ( OnNewConnection ( int, CHostAddress ) ) ); + + QObject::connect ( this, + SIGNAL ( ServerFull ( CHostAddress ) ), + pServer, SLOT ( OnServerFull ( CHostAddress ) ) ); } -#else - QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ), - this, SLOT ( OnDataReceived() ) ); -#endif } -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD void CSocket::Close() { // closesocket will cause recvfrom to return with an error because the // socket is closed -> then the thread can safely be shut down +#ifdef _WIN32 closesocket ( UdpSocket ); -} +#else + close ( UdpSocket ); #endif +} CSocket::~CSocket() { -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -# ifdef _WIN32 +#ifdef _WIN32 // the Windows socket must be cleanup on shutdown WSACleanup(); -# endif #endif } @@ -177,34 +168,18 @@ void CSocket::SendPacket ( const CVector& vecbySendBuf, // char vector in "const char*", for this we first convert the const // uint8_t vector in a read/write uint8_t vector and then do the cast to // const char*) -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - // note that the client uses the socket directly for performance reasons - if ( bIsClient ) - { - sockaddr_in UdpSocketOutAddr; + sockaddr_in UdpSocketOutAddr; - UdpSocketOutAddr.sin_family = AF_INET; - UdpSocketOutAddr.sin_port = htons ( HostAddr.iPort ); - UdpSocketOutAddr.sin_addr.s_addr = htonl ( HostAddr.InetAddr.toIPv4Address() ); + UdpSocketOutAddr.sin_family = AF_INET; + UdpSocketOutAddr.sin_port = htons ( HostAddr.iPort ); + UdpSocketOutAddr.sin_addr.s_addr = htonl ( HostAddr.InetAddr.toIPv4Address() ); - sendto ( UdpSocket, - (const char*) &( (CVector) vecbySendBuf )[0], - iVecSizeOut, - 0, - (sockaddr*) &UdpSocketOutAddr, - sizeof ( sockaddr_in ) ); - } - else - { -#endif - SocketDevice.writeDatagram ( - (const char*) &( (CVector) vecbySendBuf )[0], - iVecSizeOut, - HostAddr.InetAddr, - HostAddr.iPort ); -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - } -#endif + sendto ( UdpSocket, + (const char*) &( (CVector) vecbySendBuf )[0], + iVecSizeOut, + 0, + (sockaddr*) &UdpSocketOutAddr, + sizeof ( sockaddr_in ) ); } } @@ -225,88 +200,107 @@ bool CSocket::GetAndResetbJitterBufferOKFlag() void CSocket::OnDataReceived() { -#ifndef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - while ( SocketDevice.hasPendingDatagrams() ) -#endif - { - // read block from network interface and query address of sender -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - sockaddr_in SenderAddr; +/* + The strategy of this function is that only the "put audio" function is + called directly (i.e. the high thread priority is used) and all other less + important things like protocol parsing and acting on protocol messages is + done in the low priority thread. To get a thread transition, we have to + use the signal/slot mechanism (i.e. we use messages for that). +*/ + + // read block from network interface and query address of sender + sockaddr_in SenderAddr; #ifdef _WIN32 - int SenderAddrSize = sizeof ( sockaddr_in ); + int SenderAddrSize = sizeof ( sockaddr_in ); #else - socklen_t SenderAddrSize = sizeof ( sockaddr_in ); + socklen_t SenderAddrSize = sizeof ( sockaddr_in ); #endif - const long iNumBytesRead = recvfrom ( UdpSocket, - (char*) &vecbyRecBuf[0], - MAX_SIZE_BYTES_NETW_BUF, - 0, - (sockaddr*) &SenderAddr, - &SenderAddrSize ); -#else - const int iNumBytesRead = - SocketDevice.readDatagram ( (char*) &vecbyRecBuf[0], - MAX_SIZE_BYTES_NETW_BUF, - &SenderAddress, - &SenderPort ); -#endif + const long iNumBytesRead = recvfrom ( UdpSocket, + (char*) &vecbyRecBuf[0], + MAX_SIZE_BYTES_NETW_BUF, + 0, + (sockaddr*) &SenderAddr, + &SenderAddrSize ); - // check if an error occurred or no data could be read - if ( iNumBytesRead <= 0 ) + // check if an error occurred or no data could be read + if ( iNumBytesRead <= 0 ) + { + return; + } + + // convert address of client + RecHostAddr.InetAddr.setAddress ( ntohl ( SenderAddr.sin_addr.s_addr ) ); + RecHostAddr.iPort = ntohs ( SenderAddr.sin_port ); + + + // check if this is a protocol message + int iRecCounter; + int iRecID; + CVector vecbyMesBodyData; + + if ( !CProtocol::ParseMessageFrame ( vecbyRecBuf, + iNumBytesRead, + vecbyMesBodyData, + iRecCounter, + iRecID ) ) + { + // this is a protocol message, check the type of the message + if ( CProtocol::IsConnectionLessMessageID ( iRecID ) ) { - return; + +// TODO a copy of the vector is used -> avoid malloc in real-time routine + + emit ProtcolCLMessageReceived ( iRecID, vecbyMesBodyData, RecHostAddr ); } + else + { - // convert address of client -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - RecHostAddr.InetAddr.setAddress ( ntohl ( SenderAddr.sin_addr.s_addr ) ); - RecHostAddr.iPort = ntohs ( SenderAddr.sin_port ); -#else - RecHostAddr.InetAddr = SenderAddress; - RecHostAddr.iPort = SenderPort; -#endif +// TODO a copy of the vector is used -> avoid malloc in real-time routine + emit ProtcolMessageReceived ( iRecCounter, iRecID, vecbyMesBodyData, RecHostAddr ); + } + } + else + { + // this is most probably a regular audio packet if ( bIsClient ) { // client: - // check if packet comes from the server we want to connect and that - // the channel is enabled - if ( ( pChannel->GetAddress() == RecHostAddr ) && - pChannel->IsEnabled() ) + switch ( pChannel->PutAudioData ( vecbyRecBuf, iNumBytesRead, RecHostAddr ) ) { - // this network packet is valid, put it in the channel -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead, this ) ) -#else - switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead ) ) -#endif - { - case PS_AUDIO_ERR: - case PS_GEN_ERROR: - bJitterBufferOK = false; - break; + case PS_AUDIO_ERR: + case PS_GEN_ERROR: + bJitterBufferOK = false; + break; - default: - // do nothing - break; - } - } - else - { + case PS_NEW_CONNECTION: + // inform other objects that new connection was established + emit NewConnection(); + break; + + case PS_AUDIO_INVALID: // inform about received invalid packet by fireing an event - emit InvalidPacketReceived ( vecbyRecBuf, - iNumBytesRead, - RecHostAddr ); + emit InvalidPacketReceived ( RecHostAddr ); + break; + + default: + // do nothing + break; } } else { // server: - if ( pServer->PutData ( vecbyRecBuf, iNumBytesRead, RecHostAddr ) ) + int iCurChanID; + + if ( pServer->PutAudioData ( vecbyRecBuf, iNumBytesRead, RecHostAddr, iCurChanID ) ) { + // we have a new connection, emit a signal + emit NewConnection ( iCurChanID, RecHostAddr ); + // this was an audio packet, start server if it is in sleep mode if ( !pServer->IsRunning() ) { @@ -315,6 +309,13 @@ void CSocket::OnDataReceived() new CCustomEvent ( MS_PACKET_RECEIVED, 0, 0 ) ); } } + + // check if no channel is available + if ( iCurChanID == INVALID_CHANNEL_ID ) + { + // fire message for the state that no free channel is available + emit ServerFull ( RecHostAddr ); + } } } } diff --git a/src/socket.h b/src/socket.h index 4e3c8df1..e830017e 100755 --- a/src/socket.h +++ b/src/socket.h @@ -27,21 +27,18 @@ #include #include -#include -#include #include #include #include #include "global.h" #include "protocol.h" #include "util.h" -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -# ifndef _WIN32 -# include -# include -# endif +#ifndef _WIN32 +# include +# include #endif + // The header files channel.h and server.h require to include this header file // so we get a cyclic dependency. To solve this issue, a prototype of the // channel class and server class is defined here. @@ -55,7 +52,7 @@ class CChannel; // forward declaration of CChannel /* Classes ********************************************************************/ -/* Base socket class ---------------------------------------------------------*/ +/* Base socket class -------------------------------------------------------- */ class CSocket : public QObject { Q_OBJECT @@ -79,40 +76,16 @@ public: const CHostAddress& HostAddr ); bool GetAndResetbJitterBufferOKFlag(); - -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD void Close(); - void EmitDetectedCLMessage ( const CVector& vecbyMesBodyData, - const int iRecID ) - { - emit DetectedCLMessage ( vecbyMesBodyData, iRecID ); - } - - void EmitParseMessageBody ( const CVector& vecbyMesBodyData, - const int iRecCounter, - const int iRecID ) - { - emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); - } - - void EmitNewConnection() - { - emit NewConnection(); - } -#endif - protected: void Init ( const quint16 iPortNumber = LLCON_DEFAULT_PORT_NUMBER ); -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -# ifdef _WIN32 +#ifdef _WIN32 SOCKET UdpSocket; -# else +#else int UdpSocket; -# endif #endif - QUdpSocket SocketDevice; QMutex Mutex; @@ -132,55 +105,56 @@ public slots: void OnDataReceived(); signals: -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD - void DetectedCLMessage ( CVector vecbyMesBodyData, - int iRecID ); + void NewConnection(); // for the client - void ParseMessageBody ( CVector vecbyMesBodyData, - int iRecCounter, - int iRecID ); + void NewConnection ( int iChID, + CHostAddress RecHostAddr ); // for the server - void NewConnection(); -#endif - void InvalidPacketReceived ( CVector vecbyRecBuf, - int iNumBytesRead, - CHostAddress RecHostAddr ); + void ServerFull ( CHostAddress RecHostAddr ); + + void InvalidPacketReceived ( CHostAddress RecHostAddr ); + + void ProtcolMessageReceived ( int iRecCounter, + int iRecID, + CVector vecbyMesBodyData, + CHostAddress HostAdr ); + + void ProtcolCLMessageReceived ( int iRecID, + CVector vecbyMesBodyData, + CHostAddress HostAdr ); }; -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -/* Socket which runs in a separate high priority thread ----------------------*/ +/* Socket which runs in a separate high priority thread --------------------- */ +// The receive socket should be put in a high priority thread to ensure the GUI +// does not effect the stability of the audio stream (e.g. if the GUI is on +// high load because of a table update, the incoming network packets must still +// be put in the jitter buffer with highest priority). class CHighPrioSocket : public QObject { Q_OBJECT public: CHighPrioSocket ( CChannel* pNewChannel, - const quint16 iPortNumber ) : - Socket ( pNewChannel, iPortNumber ) - { - // Creation of the new socket thread which has to have the highest - // possible thread priority to make sure the jitter buffer is reliably - // filled with the network audio packets and does not get interrupted - // by other GUI threads. The following code is based on: - // http://qt-project.org/wiki/Threads_Events_QObjects - Socket.moveToThread ( &NetworkWorkerThread ); + const quint16 iPortNumber ) + : Socket ( pNewChannel, iPortNumber ) { Init(); } - NetworkWorkerThread.SetSocket ( &Socket ); - - NetworkWorkerThread.start ( QThread::TimeCriticalPriority ); - - // connect the "InvalidPacketReceived" signal - QObject::connect ( &Socket, - SIGNAL ( InvalidPacketReceived ( CVector, int, CHostAddress ) ), - SIGNAL ( InvalidPacketReceived ( CVector, int, CHostAddress ) ) ); - } + CHighPrioSocket ( CServer* pNewServer, + const quint16 iPortNumber ) + : Socket ( pNewServer, iPortNumber ) { Init(); } virtual ~CHighPrioSocket() { NetworkWorkerThread.exit(); } + void Start() + { + // starts the high priority socket receive thread (with using blocking + // socket request call) + NetworkWorkerThread.start ( QThread::TimeCriticalPriority ); + } + void SendPacket ( const CVector& vecbySendBuf, const CHostAddress& HostAddr ) { @@ -232,14 +206,20 @@ protected: bool bRun; }; + void Init() + { + // Creation of the new socket thread which has to have the highest + // possible thread priority to make sure the jitter buffer is reliably + // filled with the network audio packets and does not get interrupted + // by other GUI threads. The following code is based on: + // http://qt-project.org/wiki/Threads_Events_QObjects + Socket.moveToThread ( &NetworkWorkerThread ); + + NetworkWorkerThread.SetSocket ( &Socket ); + } + CSocketThread NetworkWorkerThread; CSocket Socket; - -signals: - void InvalidPacketReceived ( CVector vecbyRecBuf, - int iNumBytesRead, - CHostAddress RecHostAddr ); }; -#endif #endif /* !defined ( SOCKET_HOIHGE76GEKJH98_3_4344_BB23945IUHF1912__INCLUDED_ ) */