separate socket thread works now fine under Windows (only for Windows right now)

This commit is contained in:
Volker Fischer 2014-02-11 17:35:39 +00:00
parent 7afee80feb
commit 9ca6df618f
4 changed files with 152 additions and 11 deletions

View File

@ -112,9 +112,11 @@ QObject::connect ( &Protocol,
// this connection is intended for a thread transition if we have a // this connection is intended for a thread transition if we have a
// separate socket thread running // separate socket thread running
#ifndef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
QObject::connect ( this, QObject::connect ( this,
SIGNAL ( ParseMessageBody ( CVector<uint8_t>, int, int ) ), SIGNAL ( ParseMessageBody ( CVector<uint8_t>, int, int ) ),
this, SLOT ( OnParseMessageBody ( CVector<uint8_t>, int, int ) ) ); this, SLOT ( OnParseMessageBody ( CVector<uint8_t>, int, int ) ) );
#endif
} }
bool CChannel::ProtocolIsEnabled() bool CChannel::ProtocolIsEnabled()
@ -437,8 +439,14 @@ void CChannel::Disconnect()
} }
} }
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
const int iNumBytes,
CSocket* pSocket )
#else
EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData, EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
const int iNumBytes ) const int iNumBytes )
#endif
{ {
/* /*
Note that this function might be called from a different thread (separate Note that this function might be called from a different thread (separate
@ -480,7 +488,11 @@ EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
// fire a signal so that an other class can process this type of // fire a signal so that an other class can process this type of
// message // message
// TODO a copy of the vector is used -> avoid malloc in real-time routine // TODO a copy of the vector is used -> avoid malloc in real-time routine
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
emit pSocket->DetectedCLMessage ( vecbyMesBodyData, iRecID );
#else
emit DetectedCLMessage ( vecbyMesBodyData, iRecID ); emit DetectedCLMessage ( vecbyMesBodyData, iRecID );
#endif
// set status flag // set status flag
eRet = PS_PROT_OK; eRet = PS_PROT_OK;
@ -489,7 +501,11 @@ EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
{ {
// parse the message assuming this is a regular protocol message // parse the message assuming this is a regular protocol message
// TODO a copy of the vector is used -> avoid malloc in real-time routine // TODO a copy of the vector is used -> avoid malloc in real-time routine
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
emit pSocket->ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
#else
emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
#endif
// note that protocol OK is not correct here since we do not // note that protocol OK is not correct here since we do not
// check if the protocol was ok since we emit just a signal // check if the protocol was ok since we emit just a signal
@ -558,6 +574,11 @@ EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
if ( bNewConnection ) if ( bNewConnection )
{ {
// inform other objects that new connection was established // inform other objects that new connection was established
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// TODO socket thread???
#endif
emit NewConnection(); emit NewConnection();
} }
} }

View File

@ -63,8 +63,14 @@ public:
// use constructor initialization in the server for a vector of channels // use constructor initialization in the server for a vector of channels
CChannel ( const bool bNIsServer = true ); CChannel ( const bool bNIsServer = true );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
EPutDataStat PutData ( const CVector<uint8_t>& vecbyData,
const int iNumBytes,
CSocket* pSocket = 0 ); // TODO remove the "= 0"!
#else
EPutDataStat PutData ( const CVector<uint8_t>& vecbyData, EPutDataStat PutData ( const CVector<uint8_t>& vecbyData,
const int iNumBytes ); const int iNumBytes );
#endif
EGetDataStat GetData ( CVector<uint8_t>& vecbyData, EGetDataStat GetData ( CVector<uint8_t>& vecbyData,
const int iNumBytes ); const int iNumBytes );
@ -227,6 +233,14 @@ public slots:
Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
} }
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
void OnDetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData,
int iRecID )
{
emit DetectedCLMessage ( vecbyMesBodyData, iRecID );
}
#endif
signals: signals:
void MessReadyForSending ( CVector<uint8_t> vecMessage ); void MessReadyForSending ( CVector<uint8_t> vecMessage );
void NewConnection(); void NewConnection();

View File

@ -29,6 +29,17 @@
/* Implementation *************************************************************/ /* Implementation *************************************************************/
void CSocket::Init ( const quint16 iPortNumber ) void CSocket::Init ( const quint16 iPortNumber )
{ {
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
# ifdef _WIN32
// for the Windows socket usage we have to start it up first
WSADATA wsa;
WSAStartup ( MAKEWORD(1, 0), &wsa ); // TODO check for error and exit application on error
# endif
// create the UDP socket
UdpSocket = socket ( AF_INET, SOCK_DGRAM, 0 );
#endif
// allocate memory for network receive and send buffer in samples // allocate memory for network receive and send buffer in samples
vecbyRecBuf.Init ( MAX_SIZE_BYTES_NETW_BUF ); vecbyRecBuf.Init ( MAX_SIZE_BYTES_NETW_BUF );
@ -44,6 +55,24 @@ void CSocket::Init ( const quint16 iPortNumber )
quint16 iClientPortIncrement = 10; // start value: port nubmer plus ten quint16 iClientPortIncrement = 10; // start value: port nubmer plus ten
bSuccess = false; // initialization for while loop bSuccess = false; // initialization for while loop
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// preinitialize socket in address (only the port number is missing)
SOCKADDR_IN UdpSocketInAddr;
UdpSocketInAddr.sin_family = AF_INET;
UdpSocketInAddr.sin_addr.s_addr = INADDR_ANY;
while ( !bSuccess &&
( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) )
{
UdpSocketInAddr.sin_port = htons ( iPortNumber + iClientPortIncrement );
bSuccess = ( bind ( UdpSocket ,
(SOCKADDR*) &UdpSocketInAddr,
sizeof ( SOCKADDR_IN ) ) == 0 );
iClientPortIncrement++;
}
#else
while ( !bSuccess && while ( !bSuccess &&
( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) ) ( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) )
{ {
@ -53,6 +82,7 @@ void CSocket::Init ( const quint16 iPortNumber )
iClientPortIncrement++; iClientPortIncrement++;
} }
#endif
} }
else else
{ {
@ -85,6 +115,18 @@ void CSocket::Init ( const quint16 iPortNumber )
QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ), QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ),
this, SLOT ( OnDataReceived() ), Qt::BlockingQueuedConnection ); this, SLOT ( OnDataReceived() ), Qt::BlockingQueuedConnection );
*/ */
// TEST
QObject::connect ( this,
SIGNAL ( ParseMessageBody ( CVector<uint8_t>, int, int ) ),
pChannel, SLOT ( OnParseMessageBody ( CVector<uint8_t>, int, int ) ) );
QObject::connect ( this,
SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ),
pChannel, SLOT ( OnDetectedCLMessage ( CVector<uint8_t>, int ) ) );
} }
else else
{ {
@ -100,6 +142,16 @@ void CSocket::Init ( const quint16 iPortNumber )
#endif #endif
} }
CSocket::~CSocket()
{
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
# ifdef _WIN32
// the Windows socket must be cleanup on shutdown
WSACleanup();
# endif
#endif
}
void CSocket::SendPacket ( const CVector<uint8_t>& vecbySendBuf, void CSocket::SendPacket ( const CVector<uint8_t>& vecbySendBuf,
const CHostAddress& HostAddr ) const CHostAddress& HostAddr )
{ {
@ -113,11 +165,34 @@ void CSocket::SendPacket ( const CVector<uint8_t>& vecbySendBuf,
// char vector in "const char*", for this we first convert the const // char vector in "const char*", for this we first convert the const
// uint8_t vector in a read/write uint8_t vector and then do the cast to // uint8_t vector in a read/write uint8_t vector and then do the cast to
// const char*) // const char*)
SocketDevice.writeDatagram ( #ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
(const char*) &( (CVector<uint8_t>) vecbySendBuf )[0], // note that the client uses the socket directly for performance reasons
iVecSizeOut, if ( bIsClient )
HostAddr.InetAddr, {
HostAddr.iPort ); SOCKADDR_IN UdpSocketOutAddr;
UdpSocketOutAddr.sin_family = AF_INET;
UdpSocketOutAddr.sin_port = htons ( HostAddr.iPort );
UdpSocketOutAddr.sin_addr.s_addr = htonl ( HostAddr.InetAddr.toIPv4Address() );
sendto ( UdpSocket,
(const char*) &( (CVector<uint8_t>) vecbySendBuf )[0],
iVecSizeOut,
0,
(SOCKADDR*) &UdpSocketOutAddr,
sizeof ( SOCKADDR_IN ) );
}
else
{
#endif
SocketDevice.writeDatagram (
(const char*) &( (CVector<uint8_t>) vecbySendBuf )[0],
iVecSizeOut,
HostAddr.InetAddr,
HostAddr.iPort );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
}
#endif
} }
} }
@ -138,14 +213,28 @@ bool CSocket::GetAndResetbJitterBufferOKFlag()
void CSocket::OnDataReceived() void CSocket::OnDataReceived()
{ {
#ifndef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
while ( SocketDevice.hasPendingDatagrams() ) while ( SocketDevice.hasPendingDatagrams() )
#endif
{ {
// read block from network interface and query address of sender // read block from network interface and query address of sender
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
SOCKADDR_IN SenderAddr;
int SenderAddrSize = sizeof ( SOCKADDR_IN );
const long iNumBytesRead = recvfrom ( UdpSocket,
(char*) &vecbyRecBuf[0],
MAX_SIZE_BYTES_NETW_BUF,
0,
(SOCKADDR*) &SenderAddr,
&SenderAddrSize );
#else
const int iNumBytesRead = const int iNumBytesRead =
SocketDevice.readDatagram ( (char*) &vecbyRecBuf[0], SocketDevice.readDatagram ( (char*) &vecbyRecBuf[0],
MAX_SIZE_BYTES_NETW_BUF, MAX_SIZE_BYTES_NETW_BUF,
&SenderAddress, &SenderAddress,
&SenderPort ); &SenderPort );
#endif
// check if an error occurred // check if an error occurred
if ( iNumBytesRead < 0 ) if ( iNumBytesRead < 0 )
@ -154,8 +243,13 @@ void CSocket::OnDataReceived()
} }
// convert address of client // convert address of client
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
RecHostAddr.InetAddr.setAddress ( ntohl ( SenderAddr.sin_addr.s_addr ) );
RecHostAddr.iPort = ntohs ( SenderAddr.sin_port );
#else
RecHostAddr.InetAddr = SenderAddress; RecHostAddr.InetAddr = SenderAddress;
RecHostAddr.iPort = SenderPort; RecHostAddr.iPort = SenderPort;
#endif
if ( bIsClient ) if ( bIsClient )
{ {
@ -167,7 +261,11 @@ void CSocket::OnDataReceived()
pChannel->IsEnabled() ) pChannel->IsEnabled() )
{ {
// this network packet is valid, put it in the channel // this network packet is valid, put it in the channel
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead, this ) )
#else
switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead ) ) switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead ) )
#endif
{ {
case PS_AUDIO_ERR: case PS_AUDIO_ERR:
case PS_GEN_ERROR: case PS_GEN_ERROR:

View File

@ -67,20 +67,21 @@ public:
bIsClient ( false ), bIsClient ( false ),
bJitterBufferOK ( true ) { Init ( iPortNumber ); } bJitterBufferOK ( true ) { Init ( iPortNumber ); }
virtual ~CSocket();
void SendPacket ( const CVector<uint8_t>& vecbySendBuf, void SendPacket ( const CVector<uint8_t>& vecbySendBuf,
const CHostAddress& HostAddr ); const CHostAddress& HostAddr );
bool GetAndResetbJitterBufferOKFlag(); bool GetAndResetbJitterBufferOKFlag();
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// TEST
void waitForReadyRead() { SocketDevice.waitForReadyRead(); }
#endif
protected: protected:
void Init ( const quint16 iPortNumber = LLCON_DEFAULT_PORT_NUMBER ); void Init ( const quint16 iPortNumber = LLCON_DEFAULT_PORT_NUMBER );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
SOCKET UdpSocket;
#endif
QUdpSocket SocketDevice; QUdpSocket SocketDevice;
QMutex Mutex; QMutex Mutex;
CVector<uint8_t> vecbyRecBuf; CVector<uint8_t> vecbyRecBuf;
@ -99,6 +100,14 @@ public slots:
void OnDataReceived(); void OnDataReceived();
signals: signals:
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
void DetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData,
int iRecID );
void ParseMessageBody ( CVector<uint8_t> vecbyMesBodyData,
int iRecCounter,
int iRecID );
#endif
void InvalidPacketReceived ( CVector<uint8_t> vecbyRecBuf, void InvalidPacketReceived ( CVector<uint8_t> vecbyRecBuf,
int iNumBytesRead, int iNumBytesRead,
CHostAddress RecHostAddr ); CHostAddress RecHostAddr );
@ -167,7 +176,6 @@ protected:
{ {
while ( bRun ) while ( bRun )
{ {
pSocket->waitForReadyRead();
pSocket->OnDataReceived(); pSocket->OnDataReceived();
} }
} }