From 9ca6df618f432ce627a2b26300861b740500992b Mon Sep 17 00:00:00 2001 From: Volker Fischer Date: Tue, 11 Feb 2014 17:35:39 +0000 Subject: [PATCH] separate socket thread works now fine under Windows (only for Windows right now) --- src/channel.cpp | 21 ++++++++++ src/channel.h | 14 +++++++ src/socket.cpp | 108 +++++++++++++++++++++++++++++++++++++++++++++--- src/socket.h | 20 ++++++--- 4 files changed, 152 insertions(+), 11 deletions(-) diff --git a/src/channel.cpp b/src/channel.cpp index ed5dd6c9..dd76b837 100755 --- a/src/channel.cpp +++ b/src/channel.cpp @@ -112,9 +112,11 @@ QObject::connect ( &Protocol, // this connection is intended for a thread transition if we have a // separate socket thread running +#ifndef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD QObject::connect ( this, SIGNAL ( ParseMessageBody ( CVector, int, int ) ), this, SLOT ( OnParseMessageBody ( CVector, int, int ) ) ); +#endif } bool CChannel::ProtocolIsEnabled() @@ -437,8 +439,14 @@ void CChannel::Disconnect() } } +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD +EPutDataStat CChannel::PutData ( const CVector& vecbyData, + const int iNumBytes, + CSocket* pSocket ) +#else EPutDataStat CChannel::PutData ( const CVector& vecbyData, const int iNumBytes ) +#endif { /* Note that this function might be called from a different thread (separate @@ -480,7 +488,11 @@ EPutDataStat CChannel::PutData ( const CVector& vecbyData, // fire a signal so that an other class can process this type of // message // TODO a copy of the vector is used -> avoid malloc in real-time routine +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + emit pSocket->DetectedCLMessage ( vecbyMesBodyData, iRecID ); +#else emit DetectedCLMessage ( vecbyMesBodyData, iRecID ); +#endif // set status flag eRet = PS_PROT_OK; @@ -489,7 +501,11 @@ EPutDataStat CChannel::PutData ( const CVector& vecbyData, { // parse the message assuming this is a regular protocol message // TODO a copy of the vector is used -> avoid malloc in real-time routine +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + emit pSocket->ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); +#else emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); +#endif // note that protocol OK is not correct here since we do not // check if the protocol was ok since we emit just a signal @@ -558,6 +574,11 @@ EPutDataStat CChannel::PutData ( const CVector& vecbyData, if ( bNewConnection ) { // inform other objects that new connection was established + +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD +// TODO socket thread??? +#endif + emit NewConnection(); } } diff --git a/src/channel.h b/src/channel.h index 34021572..d36042ea 100755 --- a/src/channel.h +++ b/src/channel.h @@ -63,8 +63,14 @@ public: // use constructor initialization in the server for a vector of channels CChannel ( const bool bNIsServer = true ); +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + EPutDataStat PutData ( const CVector& vecbyData, + const int iNumBytes, + CSocket* pSocket = 0 ); // TODO remove the "= 0"! +#else EPutDataStat PutData ( const CVector& vecbyData, const int iNumBytes ); +#endif EGetDataStat GetData ( CVector& vecbyData, const int iNumBytes ); @@ -227,6 +233,14 @@ public slots: Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); } +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD +void OnDetectedCLMessage ( CVector vecbyMesBodyData, + int iRecID ) +{ + emit DetectedCLMessage ( vecbyMesBodyData, iRecID ); +} +#endif + signals: void MessReadyForSending ( CVector vecMessage ); void NewConnection(); diff --git a/src/socket.cpp b/src/socket.cpp index becb8b68..eaa53137 100755 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -29,6 +29,17 @@ /* Implementation *************************************************************/ void CSocket::Init ( const quint16 iPortNumber ) { +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD +# ifdef _WIN32 + // for the Windows socket usage we have to start it up first + WSADATA wsa; + WSAStartup ( MAKEWORD(1, 0), &wsa ); // TODO check for error and exit application on error +# endif + + // create the UDP socket + UdpSocket = socket ( AF_INET, SOCK_DGRAM, 0 ); +#endif + // allocate memory for network receive and send buffer in samples vecbyRecBuf.Init ( MAX_SIZE_BYTES_NETW_BUF ); @@ -44,6 +55,24 @@ void CSocket::Init ( const quint16 iPortNumber ) quint16 iClientPortIncrement = 10; // start value: port nubmer plus ten bSuccess = false; // initialization for while loop +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + // preinitialize socket in address (only the port number is missing) + SOCKADDR_IN UdpSocketInAddr; + UdpSocketInAddr.sin_family = AF_INET; + UdpSocketInAddr.sin_addr.s_addr = INADDR_ANY; + + while ( !bSuccess && + ( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) ) + { + UdpSocketInAddr.sin_port = htons ( iPortNumber + iClientPortIncrement ); + + bSuccess = ( bind ( UdpSocket , + (SOCKADDR*) &UdpSocketInAddr, + sizeof ( SOCKADDR_IN ) ) == 0 ); + + iClientPortIncrement++; + } +#else while ( !bSuccess && ( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) ) { @@ -53,6 +82,7 @@ void CSocket::Init ( const quint16 iPortNumber ) iClientPortIncrement++; } +#endif } else { @@ -85,6 +115,18 @@ void CSocket::Init ( const quint16 iPortNumber ) QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ), this, SLOT ( OnDataReceived() ), Qt::BlockingQueuedConnection ); */ + + +// TEST +QObject::connect ( this, + SIGNAL ( ParseMessageBody ( CVector, int, int ) ), + pChannel, SLOT ( OnParseMessageBody ( CVector, int, int ) ) ); + +QObject::connect ( this, + SIGNAL ( DetectedCLMessage ( CVector, int ) ), + pChannel, SLOT ( OnDetectedCLMessage ( CVector, int ) ) ); + + } else { @@ -100,6 +142,16 @@ void CSocket::Init ( const quint16 iPortNumber ) #endif } +CSocket::~CSocket() +{ +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD +# ifdef _WIN32 + // the Windows socket must be cleanup on shutdown + WSACleanup(); +# endif +#endif +} + void CSocket::SendPacket ( const CVector& vecbySendBuf, const CHostAddress& HostAddr ) { @@ -113,11 +165,34 @@ void CSocket::SendPacket ( const CVector& vecbySendBuf, // char vector in "const char*", for this we first convert the const // uint8_t vector in a read/write uint8_t vector and then do the cast to // const char*) - SocketDevice.writeDatagram ( - (const char*) &( (CVector) vecbySendBuf )[0], - iVecSizeOut, - HostAddr.InetAddr, - HostAddr.iPort ); +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + // note that the client uses the socket directly for performance reasons + if ( bIsClient ) + { + SOCKADDR_IN UdpSocketOutAddr; + + UdpSocketOutAddr.sin_family = AF_INET; + UdpSocketOutAddr.sin_port = htons ( HostAddr.iPort ); + UdpSocketOutAddr.sin_addr.s_addr = htonl ( HostAddr.InetAddr.toIPv4Address() ); + + sendto ( UdpSocket, + (const char*) &( (CVector) vecbySendBuf )[0], + iVecSizeOut, + 0, + (SOCKADDR*) &UdpSocketOutAddr, + sizeof ( SOCKADDR_IN ) ); + } + else + { +#endif + SocketDevice.writeDatagram ( + (const char*) &( (CVector) vecbySendBuf )[0], + iVecSizeOut, + HostAddr.InetAddr, + HostAddr.iPort ); +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + } +#endif } } @@ -138,14 +213,28 @@ bool CSocket::GetAndResetbJitterBufferOKFlag() void CSocket::OnDataReceived() { +#ifndef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD while ( SocketDevice.hasPendingDatagrams() ) +#endif { // read block from network interface and query address of sender +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + SOCKADDR_IN SenderAddr; + int SenderAddrSize = sizeof ( SOCKADDR_IN ); + + const long iNumBytesRead = recvfrom ( UdpSocket, + (char*) &vecbyRecBuf[0], + MAX_SIZE_BYTES_NETW_BUF, + 0, + (SOCKADDR*) &SenderAddr, + &SenderAddrSize ); +#else const int iNumBytesRead = SocketDevice.readDatagram ( (char*) &vecbyRecBuf[0], MAX_SIZE_BYTES_NETW_BUF, &SenderAddress, &SenderPort ); +#endif // check if an error occurred if ( iNumBytesRead < 0 ) @@ -154,8 +243,13 @@ void CSocket::OnDataReceived() } // convert address of client +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + RecHostAddr.InetAddr.setAddress ( ntohl ( SenderAddr.sin_addr.s_addr ) ); + RecHostAddr.iPort = ntohs ( SenderAddr.sin_port ); +#else RecHostAddr.InetAddr = SenderAddress; RecHostAddr.iPort = SenderPort; +#endif if ( bIsClient ) { @@ -167,7 +261,11 @@ void CSocket::OnDataReceived() pChannel->IsEnabled() ) { // this network packet is valid, put it in the channel +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead, this ) ) +#else switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead ) ) +#endif { case PS_AUDIO_ERR: case PS_GEN_ERROR: diff --git a/src/socket.h b/src/socket.h index 73c94893..7cac41ce 100755 --- a/src/socket.h +++ b/src/socket.h @@ -67,20 +67,21 @@ public: bIsClient ( false ), bJitterBufferOK ( true ) { Init ( iPortNumber ); } + virtual ~CSocket(); + void SendPacket ( const CVector& vecbySendBuf, const CHostAddress& HostAddr ); bool GetAndResetbJitterBufferOKFlag(); -#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD -// TEST -void waitForReadyRead() { SocketDevice.waitForReadyRead(); } -#endif - protected: void Init ( const quint16 iPortNumber = LLCON_DEFAULT_PORT_NUMBER ); +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + SOCKET UdpSocket; +#endif QUdpSocket SocketDevice; + QMutex Mutex; CVector vecbyRecBuf; @@ -99,6 +100,14 @@ public slots: void OnDataReceived(); signals: +#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD + void DetectedCLMessage ( CVector vecbyMesBodyData, + int iRecID ); + + void ParseMessageBody ( CVector vecbyMesBodyData, + int iRecCounter, + int iRecID ); +#endif void InvalidPacketReceived ( CVector vecbyRecBuf, int iNumBytesRead, CHostAddress RecHostAddr ); @@ -167,7 +176,6 @@ protected: { while ( bRun ) { - pSocket->waitForReadyRead(); pSocket->OnDataReceived(); } }