finished implementation of a separate socket high priority thread

This commit is contained in:
Volker Fischer 2014-02-16 08:12:07 +00:00
parent bde026b06b
commit 2e5f7781ce
7 changed files with 314 additions and 446 deletions

View file

@ -5,6 +5,9 @@
- store fader solo state - store fader solo state
- improved stability of the audio stream by reducing audio drop outs (by
using a separate socket thread)
- bug fix: the fader level could not be changed if fader was on solo - bug fix: the fader level could not be changed if fader was on solo
3.3.3 3.3.3

View file

@ -36,7 +36,8 @@ win32 {
LIBS += ole32.lib \ LIBS += ole32.lib \
user32.lib \ user32.lib \
advapi32.lib \ advapi32.lib \
winmm.lib winmm.lib \
ws2_32.lib
} else:macx { } else:macx {
HEADERS += mac/sound.h HEADERS += mac/sound.h
SOURCES += mac/sound.cpp SOURCES += mac/sound.cpp

View file

@ -109,14 +109,6 @@ QObject::connect ( &Protocol,
QObject::connect ( &Protocol, QObject::connect ( &Protocol,
SIGNAL ( ReqNetTranspProps() ), SIGNAL ( ReqNetTranspProps() ),
this, SLOT ( OnReqNetTranspProps() ) ); this, SLOT ( OnReqNetTranspProps() ) );
// this connection is intended for a thread transition if we have a
// separate socket thread running
#ifndef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
QObject::connect ( this,
SIGNAL ( ParseMessageBody ( CVector<uint8_t>, int, int ) ),
this, SLOT ( OnParseMessageBody ( CVector<uint8_t>, int, int ) ) );
#endif
} }
bool CChannel::ProtocolIsEnabled() bool CChannel::ProtocolIsEnabled()
@ -447,98 +439,37 @@ void CChannel::Disconnect()
} }
} }
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD void CChannel::PutProtcolData ( const int iRecCounter,
EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData, const int iRecID,
const int iNumBytes, const CVector<uint8_t>& vecbyMesBodyData,
CSocket* pSocket ) const CHostAddress& RecHostAddr )
#else
EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
const int iNumBytes )
#endif
{ {
/* // Only process protocol message if:
Note that this function might be called from a different thread (separate // - for client only: the packet comes from the server we want to talk to
Socket thread) and therefore we should not call functions which emit signals // - the channel is enabled
themself directly but emit a signal here so that the thread transition is // - the protocol mechanism is enabled
done as early as possible. if ( ( bIsServer || ( GetAddress() == RecHostAddr ) ) &&
This is the reason why "ParseMessageBody" is not called directly but through a IsEnabled() &&
signal-slot mechanism. ProtocolIsEnabled() )
*/ {
// parse the message assuming this is a regular protocol message
Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
}
}
EPutDataStat CChannel::PutAudioData ( const CVector<uint8_t>& vecbyData,
const int iNumBytes,
CHostAddress RecHostAddr )
{
// init return state // init return state
EPutDataStat eRet = PS_GEN_ERROR; EPutDataStat eRet = PS_GEN_ERROR;
if ( bIsEnabled ) // Only process audio data if:
// - for client only: the packet comes from the server we want to talk to
// - the channel is enabled
if ( ( bIsServer || ( GetAddress() == RecHostAddr ) ) &&
IsEnabled() )
{ {
int iRecCounter;
int iRecID;
CVector<uint8_t> vecbyMesBodyData;
// init flag
bool bNewConnection = false;
// check if this is a protocol message by trying to parse the message
// frame
if ( !Protocol.ParseMessageFrame ( vecbyData,
iNumBytes,
vecbyMesBodyData,
iRecCounter,
iRecID ) )
{
// This is a protocol message:
// only use protocol data if protocol mechanism is enabled
if ( ProtocolIsEnabled() )
{
// in case this is a connection less message, we do not process it here
if ( Protocol.IsConnectionLessMessageID ( iRecID ) )
{
// fire a signal so that an other class can process this type of
// message
// TODO a copy of the vector is used -> avoid malloc in real-time routine
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
pSocket->EmitDetectedCLMessage ( vecbyMesBodyData, iRecID );
#else
emit DetectedCLMessage ( vecbyMesBodyData, iRecID );
#endif
// set status flag
eRet = PS_PROT_OK;
}
else
{
// parse the message assuming this is a regular protocol message
// TODO a copy of the vector is used -> avoid malloc in real-time routine
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
pSocket->EmitParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
#else
emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
#endif
// note that protocol OK is not correct here since we do not
// check if the protocol was ok since we emit just a signal
// and do not get any feedback on the protocol decoding state
eRet = PS_PROT_OK;
}
}
else
{
// In case we are the server and the current channel is not
// connected, we do not evaluate protocol messages but these
// messages could start the server which is not desired,
// especially not for the disconnect messages.
// We now do not start the server if a valid protocol message
// was received but only start the server on audio packets.
// set status flag
eRet = PS_PROT_OK_MESS_NOT_EVALUATED;
}
}
else
{
// This seems to be an audio packet (only try to parse audio if it
// was not a protocol packet):
MutexSocketBuf.lock(); MutexSocketBuf.lock();
{ {
// only process audio if packet has correct size // only process audio if packet has correct size
@ -569,25 +500,21 @@ EPutDataStat CChannel::PutData ( const CVector<uint8_t>& vecbyData,
// about the audio packet properties via the protocol. // about the audio packet properties via the protocol.
// check if channel was not connected, this is a new connection // check if channel was not connected, this is a new connection
// (do not fire an event directly since we are inside a mutex if ( !IsConnected() )
// region -> to avoid a dead-lock) {
bNewConnection = !IsConnected(); // overwrite status
eRet = PS_NEW_CONNECTION;
}
// reset time-out counter // reset time-out counter (note that this must be done after the
// "IsConnected()" query above)
ResetTimeOutCounter(); ResetTimeOutCounter();
} }
MutexSocketBuf.unlock(); MutexSocketBuf.unlock();
} }
else
if ( bNewConnection )
{ {
// inform other objects that new connection was established eRet = PS_AUDIO_INVALID;
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
pSocket->EmitNewConnection();
#else
emit NewConnection();
#endif
}
} }
return eRet; return eRet;
@ -657,24 +584,7 @@ EGetDataStat CChannel::GetData ( CVector<uint8_t>& vecbyData,
return eGetStatus; return eGetStatus;
} }
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD void CChannel::PrepAndSendPacket ( CHighPrioSocket* pSocket,
void CChannel::PrepAndSendPacketHPS ( CHighPrioSocket* pSocket,
const CVector<uint8_t>& vecbyNPacket,
const int iNPacketLen )
{
// TODO Doubled code!!! Same code as in PrepAndSendPacket (see below)!!!
QMutexLocker locker ( &MutexConvBuf );
// use conversion buffer to convert sound card block size in network
// block size
if ( ConvBuf.Put ( vecbyNPacket, iNPacketLen ) )
{
pSocket->SendPacket ( ConvBuf.Get(), GetAddress() );
}
}
#endif
void CChannel::PrepAndSendPacket ( CSocket* pSocket,
const CVector<uint8_t>& vecbyNPacket, const CVector<uint8_t>& vecbyNPacket,
const int iNPacketLen ) const int iNPacketLen )
{ {

View file

@ -47,9 +47,11 @@ enum EPutDataStat
PS_GEN_ERROR, PS_GEN_ERROR,
PS_AUDIO_OK, PS_AUDIO_OK,
PS_AUDIO_ERR, PS_AUDIO_ERR,
PS_AUDIO_INVALID,
PS_PROT_OK, PS_PROT_OK,
PS_PROT_OK_MESS_NOT_EVALUATED, PS_PROT_OK_MESS_NOT_EVALUATED,
PS_PROT_ERR PS_PROT_ERR,
PS_NEW_CONNECTION
}; };
@ -63,27 +65,21 @@ public:
// use constructor initialization in the server for a vector of channels // use constructor initialization in the server for a vector of channels
CChannel ( const bool bNIsServer = true ); CChannel ( const bool bNIsServer = true );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD void PutProtcolData ( const int iRecCounter,
EPutDataStat PutData ( const CVector<uint8_t>& vecbyData, const int iRecID,
const CVector<uint8_t>& vecbyMesBodyData,
const CHostAddress& RecHostAddr );
EPutDataStat PutAudioData ( const CVector<uint8_t>& vecbyData,
const int iNumBytes, const int iNumBytes,
CSocket* pSocket = 0 ); // TODO remove the "= 0"! CHostAddress RecHostAddr );
#else
EPutDataStat PutData ( const CVector<uint8_t>& vecbyData,
const int iNumBytes );
#endif
EGetDataStat GetData ( CVector<uint8_t>& vecbyData, EGetDataStat GetData ( CVector<uint8_t>& vecbyData,
const int iNumBytes ); const int iNumBytes );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD void PrepAndSendPacket ( CHighPrioSocket* pSocket,
void PrepAndSendPacketHPS ( CHighPrioSocket* pSocket,
const CVector<uint8_t>& vecbyNPacket, const CVector<uint8_t>& vecbyNPacket,
const int iNPacketLen ); const int iNPacketLen );
#endif
void PrepAndSendPacket ( CSocket* pSocket,
const CVector<uint8_t>& vecbyNPacket,
const int iNPacketLen );
void ResetTimeOutCounter() { iConTimeOut = iConTimeOutStartVal; } void ResetTimeOutCounter() { iConTimeOut = iConTimeOutStartVal; }
bool IsConnected() const { return iConTimeOut > 0; } bool IsConnected() const { return iConTimeOut > 0; }
@ -235,18 +231,22 @@ public slots:
Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID ); Protocol.ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
} }
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD void OnProtcolMessageReceived ( int iRecCounter,
void OnDetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData, int iRecID,
int iRecID ) CVector<uint8_t> vecbyMesBodyData,
CHostAddress RecHostAddr )
{ {
emit DetectedCLMessage ( vecbyMesBodyData, iRecID ); PutProtcolData ( iRecCounter, iRecID, vecbyMesBodyData, RecHostAddr );
} }
void OnNewConnection() void OnProtcolCLMessageReceived ( int iRecID,
CVector<uint8_t> vecbyMesBodyData,
CHostAddress RecHostAddr )
{ {
emit NewConnection(); emit DetectedCLMessage ( vecbyMesBodyData, iRecID, RecHostAddr );
} }
#endif
void OnNewConnection() { emit NewConnection(); }
signals: signals:
void MessReadyForSending ( CVector<uint8_t> vecMessage ); void MessReadyForSending ( CVector<uint8_t> vecMessage );
@ -265,7 +265,8 @@ signals:
void Disconnected(); void Disconnected();
void DetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData, void DetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData,
int iRecID ); int iRecID,
CHostAddress RecHostAddr );
void ParseMessageBody ( CVector<uint8_t> vecbyMesBodyData, void ParseMessageBody ( CVector<uint8_t> vecbyMesBodyData,
int iRecCounter, int iRecCounter,

View file

@ -130,9 +130,10 @@ public:
void Stop(); void Stop();
bool IsRunning() { return HighPrecisionTimer.isActive(); } bool IsRunning() { return HighPrecisionTimer.isActive(); }
bool PutData ( const CVector<uint8_t>& vecbyRecBuf, bool PutAudioData ( const CVector<uint8_t>& vecbyRecBuf,
const int iNumBytesRead, const int iNumBytesRead,
const CHostAddress& HostAdr ); const CHostAddress& HostAdr,
int& iCurChanID );
void GetConCliParam ( CVector<CHostAddress>& vecHostAddresses, void GetConCliParam ( CVector<CHostAddress>& vecHostAddresses,
CVector<QString>& vecsName, CVector<QString>& vecsName,
@ -242,7 +243,7 @@ protected:
CVector<uint8_t> vecbyCodedData; CVector<uint8_t> vecbyCodedData;
// actual working objects // actual working objects
CSocket Socket; CHighPrioSocket Socket;
// logging // logging
CServerLogging Logging; CServerLogging Logging;
@ -269,13 +270,26 @@ signals:
public slots: public slots:
void OnTimer(); void OnTimer();
void OnSendProtMessage ( int iChID, CVector<uint8_t> vecMessage );
void OnNewConnection ( int iChID );
void OnSendCLProtMessage ( CHostAddress InetAddr, CVector<uint8_t> vecMessage );
void OnDetCLMess ( const CVector<uint8_t>& vecbyMesBodyData, void OnSendProtMessage ( int iChID,
const int iRecID, CVector<uint8_t> vecMessage );
const CHostAddress& InetAddr );
void OnNewConnection ( int iChID,
CHostAddress RecHostAddr );
void OnServerFull ( CHostAddress RecHostAddr );
void OnSendCLProtMessage ( CHostAddress InetAddr,
CVector<uint8_t> vecMessage );
void OnProtcolCLMessageReceived ( int iRecID,
CVector<uint8_t> vecbyMesBodyData,
CHostAddress RecHostAddr );
void OnProtcolMessageReceived ( int iRecCounter,
int iRecID,
CVector<uint8_t> vecbyMesBodyData,
CHostAddress RecHostAddr );
void OnCLPingReceived ( CHostAddress InetAddr, int iMs ) void OnCLPingReceived ( CHostAddress InetAddr, int iMs )
{ ConnLessProtocol.CreateCLPingMes ( InetAddr, iMs ); } { ConnLessProtocol.CreateCLPingMes ( InetAddr, iMs ); }
@ -341,48 +355,6 @@ public slots:
void OnSendProtMessCh18 ( CVector<uint8_t> mess ) { OnSendProtMessage ( 18, mess ); } void OnSendProtMessCh18 ( CVector<uint8_t> mess ) { OnSendProtMessage ( 18, mess ); }
void OnSendProtMessCh19 ( CVector<uint8_t> mess ) { OnSendProtMessage ( 19, mess ); } void OnSendProtMessCh19 ( CVector<uint8_t> mess ) { OnSendProtMessage ( 19, mess ); }
void OnDetCLMessCh0 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[0].GetAddress() ); }
void OnDetCLMessCh1 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[1].GetAddress() ); }
void OnDetCLMessCh2 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[2].GetAddress() ); }
void OnDetCLMessCh3 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[3].GetAddress() ); }
void OnDetCLMessCh4 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[4].GetAddress() ); }
void OnDetCLMessCh5 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[5].GetAddress() ); }
void OnDetCLMessCh6 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[6].GetAddress() ); }
void OnDetCLMessCh7 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[7].GetAddress() ); }
void OnDetCLMessCh8 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[8].GetAddress() ); }
void OnDetCLMessCh9 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[9].GetAddress() ); }
void OnDetCLMessCh10 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[10].GetAddress() ); }
void OnDetCLMessCh11 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[11].GetAddress() ); }
void OnDetCLMessCh12 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[12].GetAddress() ); }
void OnDetCLMessCh13 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[13].GetAddress() ); }
void OnDetCLMessCh14 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[14].GetAddress() ); }
void OnDetCLMessCh15 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[15].GetAddress() ); }
void OnDetCLMessCh16 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[16].GetAddress() ); }
void OnDetCLMessCh17 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[17].GetAddress() ); }
void OnDetCLMessCh18 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[18].GetAddress() ); }
void OnDetCLMessCh19 ( CVector<uint8_t> vData, int iID ) { OnDetCLMess ( vData, iID, vecChannels[19].GetAddress() ); }
void OnNewConnectionCh0() { OnNewConnection ( 0 ); }
void OnNewConnectionCh1() { OnNewConnection ( 1 ); }
void OnNewConnectionCh2() { OnNewConnection ( 2 ); }
void OnNewConnectionCh3() { OnNewConnection ( 3 ); }
void OnNewConnectionCh4() { OnNewConnection ( 4 ); }
void OnNewConnectionCh5() { OnNewConnection ( 5 ); }
void OnNewConnectionCh6() { OnNewConnection ( 6 ); }
void OnNewConnectionCh7() { OnNewConnection ( 7 ); }
void OnNewConnectionCh8() { OnNewConnection ( 8 ); }
void OnNewConnectionCh9() { OnNewConnection ( 9 ); }
void OnNewConnectionCh10() { OnNewConnection ( 10 ); }
void OnNewConnectionCh11() { OnNewConnection ( 11 ); }
void OnNewConnectionCh12() { OnNewConnection ( 12 ); }
void OnNewConnectionCh13() { OnNewConnection ( 13 ); }
void OnNewConnectionCh14() { OnNewConnection ( 14 ); }
void OnNewConnectionCh15() { OnNewConnection ( 15 ); }
void OnNewConnectionCh16() { OnNewConnection ( 16 ); }
void OnNewConnectionCh17() { OnNewConnection ( 17 ); }
void OnNewConnectionCh18() { OnNewConnection ( 18 ); }
void OnNewConnectionCh19() { OnNewConnection ( 19 ); }
void OnReqConnClientsListCh0() { CreateAndSendChanListForThisChan ( 0 ); } void OnReqConnClientsListCh0() { CreateAndSendChanListForThisChan ( 0 ); }
void OnReqConnClientsListCh1() { CreateAndSendChanListForThisChan ( 1 ); } void OnReqConnClientsListCh1() { CreateAndSendChanListForThisChan ( 1 ); }
void OnReqConnClientsListCh2() { CreateAndSendChanListForThisChan ( 2 ); } void OnReqConnClientsListCh2() { CreateAndSendChanListForThisChan ( 2 ); }

View file

@ -29,20 +29,26 @@
/* Implementation *************************************************************/ /* Implementation *************************************************************/
void CSocket::Init ( const quint16 iPortNumber ) void CSocket::Init ( const quint16 iPortNumber )
{ {
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
#ifdef _WIN32 #ifdef _WIN32
// for the Windows socket usage we have to start it up first // for the Windows socket usage we have to start it up first
// TODO check for error and exit application on error
WSADATA wsa; WSADATA wsa;
WSAStartup ( MAKEWORD(1, 0), &wsa ); // TODO check for error and exit application on error WSAStartup ( MAKEWORD(1, 0), &wsa );
#endif #endif
// create the UDP socket // create the UDP socket
UdpSocket = socket ( AF_INET, SOCK_DGRAM, 0 ); UdpSocket = socket ( AF_INET, SOCK_DGRAM, 0 );
#endif
// allocate memory for network receive and send buffer in samples // allocate memory for network receive and send buffer in samples
vecbyRecBuf.Init ( MAX_SIZE_BYTES_NETW_BUF ); vecbyRecBuf.Init ( MAX_SIZE_BYTES_NETW_BUF );
// preinitialize socket in address (only the port number is missing)
sockaddr_in UdpSocketInAddr;
UdpSocketInAddr.sin_family = AF_INET;
UdpSocketInAddr.sin_addr.s_addr = INADDR_ANY;
// initialize the listening socket // initialize the listening socket
bool bSuccess; bool bSuccess;
@ -55,12 +61,6 @@ void CSocket::Init ( const quint16 iPortNumber )
quint16 iClientPortIncrement = 10; // start value: port nubmer plus ten quint16 iClientPortIncrement = 10; // start value: port nubmer plus ten
bSuccess = false; // initialization for while loop bSuccess = false; // initialization for while loop
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// preinitialize socket in address (only the port number is missing)
sockaddr_in UdpSocketInAddr;
UdpSocketInAddr.sin_family = AF_INET;
UdpSocketInAddr.sin_addr.s_addr = INADDR_ANY;
while ( !bSuccess && while ( !bSuccess &&
( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) ) ( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) )
{ {
@ -72,25 +72,17 @@ void CSocket::Init ( const quint16 iPortNumber )
iClientPortIncrement++; iClientPortIncrement++;
} }
#else
while ( !bSuccess &&
( iClientPortIncrement <= NUM_SOCKET_PORTS_TO_TRY ) )
{
bSuccess = SocketDevice.bind (
QHostAddress ( QHostAddress::Any ),
iPortNumber + iClientPortIncrement );
iClientPortIncrement++;
}
#endif
} }
else else
{ {
// for the server, only try the given port number and do not try out // for the server, only try the given port number and do not try out
// other port numbers to bind since it is imporatant that the server // other port numbers to bind since it is imporatant that the server
// gets the desired port number // gets the desired port number
bSuccess = SocketDevice.bind ( UdpSocketInAddr.sin_port = htons ( iPortNumber );
QHostAddress ( QHostAddress::Any ), iPortNumber );
bSuccess = ( bind ( UdpSocket ,
(sockaddr*) &UdpSocketInAddr,
sizeof ( sockaddr_in ) ) == 0 );
} }
if ( !bSuccess ) if ( !bSuccess )
@ -100,68 +92,67 @@ void CSocket::Init ( const quint16 iPortNumber )
"the software is already running).", "Network Error" ); "the software is already running).", "Network Error" );
} }
// connect the "activated" signal
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD // Connections -------------------------------------------------------------
// it is important to do the following connections in this class since we
// have a thread transition
// we have different connections for client and server
if ( bIsClient ) if ( bIsClient )
{ {
// TEST We do a test where we call "waitForReadyRead" instead of even driven method. // client connections:
/*
// We have to use a blocked queued connection since in case we use a
// separate socket thread, the "readyRead" signal would occur and our
// "OnDataReceived" function would be run in another thread. This could
// lead to a situation that a new "readRead" occurs while the processing
// of the previous signal was not finished -> the error: "Multiple
// socket notifiers for same socket" may occur.
QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ),
this, SLOT ( OnDataReceived() ), Qt::BlockingQueuedConnection );
*/
// TEST
QObject::connect ( this,
SIGNAL ( ParseMessageBody ( CVector<uint8_t>, int, int ) ),
pChannel, SLOT ( OnParseMessageBody ( CVector<uint8_t>, int, int ) ) );
QObject::connect ( this, QObject::connect ( this,
SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), SIGNAL ( ProtcolMessageReceived ( int, int, CVector<uint8_t>, CHostAddress ) ),
pChannel, SLOT ( OnDetectedCLMessage ( CVector<uint8_t>, int ) ) ); pChannel, SLOT ( OnProtcolMessageReceived ( int, int, CVector<uint8_t>, CHostAddress ) ) );
QObject::connect ( this, QObject::connect ( this,
SIGNAL ( NewConnection ( CVector<uint8_t>, int ) ), SIGNAL ( ProtcolCLMessageReceived ( int, CVector<uint8_t>, CHostAddress ) ),
pChannel, SLOT ( OnNewConnection ( CVector<uint8_t>, int ) ) ); pChannel, SLOT ( OnProtcolCLMessageReceived ( int, CVector<uint8_t>, CHostAddress ) ) );
QObject::connect ( this,
SIGNAL ( NewConnection() ),
pChannel, SLOT ( OnNewConnection() ) );
} }
else else
{ {
// the server does not use a separate socket thread right now, in that // server connections:
// case we must not use the blocking queued connection, otherwise we
// would get a dead lock QObject::connect ( this,
QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ), SIGNAL ( ProtcolMessageReceived ( int, int, CVector<uint8_t>, CHostAddress ) ),
this, SLOT ( OnDataReceived() ) ); pServer, SLOT ( OnProtcolMessageReceived ( int, int, CVector<uint8_t>, CHostAddress ) ) );
QObject::connect ( this,
SIGNAL ( ProtcolCLMessageReceived ( int, CVector<uint8_t>, CHostAddress ) ),
pServer, SLOT ( OnProtcolCLMessageReceived ( int, CVector<uint8_t>, CHostAddress ) ) );
QObject::connect ( this,
SIGNAL ( NewConnection ( int, CHostAddress ) ),
pServer, SLOT ( OnNewConnection ( int, CHostAddress ) ) );
QObject::connect ( this,
SIGNAL ( ServerFull ( CHostAddress ) ),
pServer, SLOT ( OnServerFull ( CHostAddress ) ) );
} }
#else
QObject::connect ( &SocketDevice, SIGNAL ( readyRead() ),
this, SLOT ( OnDataReceived() ) );
#endif
} }
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
void CSocket::Close() void CSocket::Close()
{ {
// closesocket will cause recvfrom to return with an error because the // closesocket will cause recvfrom to return with an error because the
// socket is closed -> then the thread can safely be shut down // socket is closed -> then the thread can safely be shut down
#ifdef _WIN32
closesocket ( UdpSocket ); closesocket ( UdpSocket );
} #else
close ( UdpSocket );
#endif #endif
}
CSocket::~CSocket() CSocket::~CSocket()
{ {
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
#ifdef _WIN32 #ifdef _WIN32
// the Windows socket must be cleanup on shutdown // the Windows socket must be cleanup on shutdown
WSACleanup(); WSACleanup();
#endif #endif
#endif
} }
void CSocket::SendPacket ( const CVector<uint8_t>& vecbySendBuf, void CSocket::SendPacket ( const CVector<uint8_t>& vecbySendBuf,
@ -177,10 +168,6 @@ void CSocket::SendPacket ( const CVector<uint8_t>& vecbySendBuf,
// char vector in "const char*", for this we first convert the const // char vector in "const char*", for this we first convert the const
// uint8_t vector in a read/write uint8_t vector and then do the cast to // uint8_t vector in a read/write uint8_t vector and then do the cast to
// const char*) // const char*)
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// note that the client uses the socket directly for performance reasons
if ( bIsClient )
{
sockaddr_in UdpSocketOutAddr; sockaddr_in UdpSocketOutAddr;
UdpSocketOutAddr.sin_family = AF_INET; UdpSocketOutAddr.sin_family = AF_INET;
@ -194,18 +181,6 @@ void CSocket::SendPacket ( const CVector<uint8_t>& vecbySendBuf,
(sockaddr*) &UdpSocketOutAddr, (sockaddr*) &UdpSocketOutAddr,
sizeof ( sockaddr_in ) ); sizeof ( sockaddr_in ) );
} }
else
{
#endif
SocketDevice.writeDatagram (
(const char*) &( (CVector<uint8_t>) vecbySendBuf )[0],
iVecSizeOut,
HostAddr.InetAddr,
HostAddr.iPort );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
}
#endif
}
} }
bool CSocket::GetAndResetbJitterBufferOKFlag() bool CSocket::GetAndResetbJitterBufferOKFlag()
@ -225,12 +200,15 @@ bool CSocket::GetAndResetbJitterBufferOKFlag()
void CSocket::OnDataReceived() void CSocket::OnDataReceived()
{ {
#ifndef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD /*
while ( SocketDevice.hasPendingDatagrams() ) The strategy of this function is that only the "put audio" function is
#endif called directly (i.e. the high thread priority is used) and all other less
{ important things like protocol parsing and acting on protocol messages is
done in the low priority thread. To get a thread transition, we have to
use the signal/slot mechanism (i.e. we use messages for that).
*/
// read block from network interface and query address of sender // read block from network interface and query address of sender
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
sockaddr_in SenderAddr; sockaddr_in SenderAddr;
#ifdef _WIN32 #ifdef _WIN32
int SenderAddrSize = sizeof ( sockaddr_in ); int SenderAddrSize = sizeof ( sockaddr_in );
@ -244,13 +222,6 @@ void CSocket::OnDataReceived()
0, 0,
(sockaddr*) &SenderAddr, (sockaddr*) &SenderAddr,
&SenderAddrSize ); &SenderAddrSize );
#else
const int iNumBytesRead =
SocketDevice.readDatagram ( (char*) &vecbyRecBuf[0],
MAX_SIZE_BYTES_NETW_BUF,
&SenderAddress,
&SenderPort );
#endif
// check if an error occurred or no data could be read // check if an error occurred or no data could be read
if ( iNumBytesRead <= 0 ) if ( iNumBytesRead <= 0 )
@ -259,54 +230,77 @@ void CSocket::OnDataReceived()
} }
// convert address of client // convert address of client
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
RecHostAddr.InetAddr.setAddress ( ntohl ( SenderAddr.sin_addr.s_addr ) ); RecHostAddr.InetAddr.setAddress ( ntohl ( SenderAddr.sin_addr.s_addr ) );
RecHostAddr.iPort = ntohs ( SenderAddr.sin_port ); RecHostAddr.iPort = ntohs ( SenderAddr.sin_port );
#else
RecHostAddr.InetAddr = SenderAddress;
RecHostAddr.iPort = SenderPort;
#endif
// check if this is a protocol message
int iRecCounter;
int iRecID;
CVector<uint8_t> vecbyMesBodyData;
if ( !CProtocol::ParseMessageFrame ( vecbyRecBuf,
iNumBytesRead,
vecbyMesBodyData,
iRecCounter,
iRecID ) )
{
// this is a protocol message, check the type of the message
if ( CProtocol::IsConnectionLessMessageID ( iRecID ) )
{
// TODO a copy of the vector is used -> avoid malloc in real-time routine
emit ProtcolCLMessageReceived ( iRecID, vecbyMesBodyData, RecHostAddr );
}
else
{
// TODO a copy of the vector is used -> avoid malloc in real-time routine
emit ProtcolMessageReceived ( iRecCounter, iRecID, vecbyMesBodyData, RecHostAddr );
}
}
else
{
// this is most probably a regular audio packet
if ( bIsClient ) if ( bIsClient )
{ {
// client: // client:
// check if packet comes from the server we want to connect and that switch ( pChannel->PutAudioData ( vecbyRecBuf, iNumBytesRead, RecHostAddr ) )
// the channel is enabled
if ( ( pChannel->GetAddress() == RecHostAddr ) &&
pChannel->IsEnabled() )
{
// this network packet is valid, put it in the channel
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead, this ) )
#else
switch ( pChannel->PutData ( vecbyRecBuf, iNumBytesRead ) )
#endif
{ {
case PS_AUDIO_ERR: case PS_AUDIO_ERR:
case PS_GEN_ERROR: case PS_GEN_ERROR:
bJitterBufferOK = false; bJitterBufferOK = false;
break; break;
case PS_NEW_CONNECTION:
// inform other objects that new connection was established
emit NewConnection();
break;
case PS_AUDIO_INVALID:
// inform about received invalid packet by fireing an event
emit InvalidPacketReceived ( RecHostAddr );
break;
default: default:
// do nothing // do nothing
break; break;
} }
} }
else else
{
// inform about received invalid packet by fireing an event
emit InvalidPacketReceived ( vecbyRecBuf,
iNumBytesRead,
RecHostAddr );
}
}
else
{ {
// server: // server:
if ( pServer->PutData ( vecbyRecBuf, iNumBytesRead, RecHostAddr ) ) int iCurChanID;
if ( pServer->PutAudioData ( vecbyRecBuf, iNumBytesRead, RecHostAddr, iCurChanID ) )
{ {
// we have a new connection, emit a signal
emit NewConnection ( iCurChanID, RecHostAddr );
// this was an audio packet, start server if it is in sleep mode // this was an audio packet, start server if it is in sleep mode
if ( !pServer->IsRunning() ) if ( !pServer->IsRunning() )
{ {
@ -315,6 +309,13 @@ void CSocket::OnDataReceived()
new CCustomEvent ( MS_PACKET_RECEIVED, 0, 0 ) ); new CCustomEvent ( MS_PACKET_RECEIVED, 0, 0 ) );
} }
} }
// check if no channel is available
if ( iCurChanID == INVALID_CHANNEL_ID )
{
// fire message for the state that no free channel is available
emit ServerFull ( RecHostAddr );
}
} }
} }
} }

View file

@ -27,20 +27,17 @@
#include <QObject> #include <QObject>
#include <QMessageBox> #include <QMessageBox>
#include <QUdpSocket>
#include <QSocketNotifier>
#include <QThread> #include <QThread>
#include <QMutex> #include <QMutex>
#include <vector> #include <vector>
#include "global.h" #include "global.h"
#include "protocol.h" #include "protocol.h"
#include "util.h" #include "util.h"
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
#ifndef _WIN32 #ifndef _WIN32
# include <netinet/in.h> # include <netinet/in.h>
# include <sys/socket.h> # include <sys/socket.h>
#endif #endif
#endif
// The header files channel.h and server.h require to include this header file // The header files channel.h and server.h require to include this header file
// so we get a cyclic dependency. To solve this issue, a prototype of the // so we get a cyclic dependency. To solve this issue, a prototype of the
@ -55,7 +52,7 @@ class CChannel; // forward declaration of CChannel
/* Classes ********************************************************************/ /* Classes ********************************************************************/
/* Base socket class ---------------------------------------------------------*/ /* Base socket class -------------------------------------------------------- */
class CSocket : public QObject class CSocket : public QObject
{ {
Q_OBJECT Q_OBJECT
@ -79,40 +76,16 @@ public:
const CHostAddress& HostAddr ); const CHostAddress& HostAddr );
bool GetAndResetbJitterBufferOKFlag(); bool GetAndResetbJitterBufferOKFlag();
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
void Close(); void Close();
void EmitDetectedCLMessage ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecID )
{
emit DetectedCLMessage ( vecbyMesBodyData, iRecID );
}
void EmitParseMessageBody ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecCounter,
const int iRecID )
{
emit ParseMessageBody ( vecbyMesBodyData, iRecCounter, iRecID );
}
void EmitNewConnection()
{
emit NewConnection();
}
#endif
protected: protected:
void Init ( const quint16 iPortNumber = LLCON_DEFAULT_PORT_NUMBER ); void Init ( const quint16 iPortNumber = LLCON_DEFAULT_PORT_NUMBER );
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
#ifdef _WIN32 #ifdef _WIN32
SOCKET UdpSocket; SOCKET UdpSocket;
#else #else
int UdpSocket; int UdpSocket;
#endif #endif
#endif
QUdpSocket SocketDevice;
QMutex Mutex; QMutex Mutex;
@ -132,55 +105,56 @@ public slots:
void OnDataReceived(); void OnDataReceived();
signals: signals:
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD void NewConnection(); // for the client
void DetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData,
int iRecID );
void ParseMessageBody ( CVector<uint8_t> vecbyMesBodyData, void NewConnection ( int iChID,
int iRecCounter, CHostAddress RecHostAddr ); // for the server
int iRecID );
void NewConnection(); void ServerFull ( CHostAddress RecHostAddr );
#endif
void InvalidPacketReceived ( CVector<uint8_t> vecbyRecBuf, void InvalidPacketReceived ( CHostAddress RecHostAddr );
int iNumBytesRead,
CHostAddress RecHostAddr ); void ProtcolMessageReceived ( int iRecCounter,
int iRecID,
CVector<uint8_t> vecbyMesBodyData,
CHostAddress HostAdr );
void ProtcolCLMessageReceived ( int iRecID,
CVector<uint8_t> vecbyMesBodyData,
CHostAddress HostAdr );
}; };
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD /* Socket which runs in a separate high priority thread --------------------- */
/* Socket which runs in a separate high priority thread ----------------------*/ // The receive socket should be put in a high priority thread to ensure the GUI
// does not effect the stability of the audio stream (e.g. if the GUI is on
// high load because of a table update, the incoming network packets must still
// be put in the jitter buffer with highest priority).
class CHighPrioSocket : public QObject class CHighPrioSocket : public QObject
{ {
Q_OBJECT Q_OBJECT
public: public:
CHighPrioSocket ( CChannel* pNewChannel, CHighPrioSocket ( CChannel* pNewChannel,
const quint16 iPortNumber ) : const quint16 iPortNumber )
Socket ( pNewChannel, iPortNumber ) : Socket ( pNewChannel, iPortNumber ) { Init(); }
{
// Creation of the new socket thread which has to have the highest
// possible thread priority to make sure the jitter buffer is reliably
// filled with the network audio packets and does not get interrupted
// by other GUI threads. The following code is based on:
// http://qt-project.org/wiki/Threads_Events_QObjects
Socket.moveToThread ( &NetworkWorkerThread );
NetworkWorkerThread.SetSocket ( &Socket ); CHighPrioSocket ( CServer* pNewServer,
const quint16 iPortNumber )
NetworkWorkerThread.start ( QThread::TimeCriticalPriority ); : Socket ( pNewServer, iPortNumber ) { Init(); }
// connect the "InvalidPacketReceived" signal
QObject::connect ( &Socket,
SIGNAL ( InvalidPacketReceived ( CVector<uint8_t>, int, CHostAddress ) ),
SIGNAL ( InvalidPacketReceived ( CVector<uint8_t>, int, CHostAddress ) ) );
}
virtual ~CHighPrioSocket() virtual ~CHighPrioSocket()
{ {
NetworkWorkerThread.exit(); NetworkWorkerThread.exit();
} }
void Start()
{
// starts the high priority socket receive thread (with using blocking
// socket request call)
NetworkWorkerThread.start ( QThread::TimeCriticalPriority );
}
void SendPacket ( const CVector<uint8_t>& vecbySendBuf, void SendPacket ( const CVector<uint8_t>& vecbySendBuf,
const CHostAddress& HostAddr ) const CHostAddress& HostAddr )
{ {
@ -232,14 +206,20 @@ protected:
bool bRun; bool bRun;
}; };
void Init()
{
// Creation of the new socket thread which has to have the highest
// possible thread priority to make sure the jitter buffer is reliably
// filled with the network audio packets and does not get interrupted
// by other GUI threads. The following code is based on:
// http://qt-project.org/wiki/Threads_Events_QObjects
Socket.moveToThread ( &NetworkWorkerThread );
NetworkWorkerThread.SetSocket ( &Socket );
}
CSocketThread NetworkWorkerThread; CSocketThread NetworkWorkerThread;
CSocket Socket; CSocket Socket;
signals:
void InvalidPacketReceived ( CVector<uint8_t> vecbyRecBuf,
int iNumBytesRead,
CHostAddress RecHostAddr );
}; };
#endif
#endif /* !defined ( SOCKET_HOIHGE76GEKJH98_3_4344_BB23945IUHF1912__INCLUDED_ ) */ #endif /* !defined ( SOCKET_HOIHGE76GEKJH98_3_4344_BB23945IUHF1912__INCLUDED_ ) */