finished implementation of a separate socket high priority thread

This commit is contained in:
Volker Fischer 2014-02-16 08:20:07 +00:00
parent 2e5f7781ce
commit 2ac7a92eaf
3 changed files with 124 additions and 171 deletions

View file

@ -327,11 +327,7 @@ void SetAudoCompressiontype ( const EAudComprType eNAudCompressionType );
bool bIsInitializationPhase;
CVector<unsigned char> vecCeltData;
#ifdef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
CHighPrioSocket Socket;
#else
CSocket Socket;
#endif
CSound Sound;
CStereoSignalLevelMeter SignalLevelMeter;
@ -377,10 +373,12 @@ void SetAudoCompressiontype ( const EAudComprType eNAudCompressionType );
public slots:
void OnSendProtMessage ( CVector<uint8_t> vecMessage );
void OnInvalidPacketReceived ( CVector<uint8_t> vecbyRecBuf,
int iNumBytesRead,
CHostAddress RecHostAddr );
void OnDetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData, int iRecID );
void OnInvalidPacketReceived ( CHostAddress RecHostAddr );
void OnDetectedCLMessage ( CVector<uint8_t> vecbyMesBodyData,
int iRecID,
CHostAddress RecHostAddr );
void OnReqJittBufSize() { CreateServerJitterBufferMessage(); }
void OnJittBufSizeChanged ( int iNewJitBufSize );
void OnReqChanInfo() { Channel.SetRemoteInfo ( ChannelInfo ); }
@ -388,7 +386,9 @@ public slots:
void OnCLPingReceived ( CHostAddress InetAddr,
int iMs );
void OnSendCLProtMessage ( CHostAddress InetAddr, CVector<uint8_t> vecMessage );
void OnSendCLProtMessage ( CHostAddress InetAddr,
CVector<uint8_t> vecMessage );
void OnCLPingWithNumClientsReceived ( CHostAddress InetAddr,
int iMs,
int iNumClients );
@ -403,8 +403,10 @@ signals:
void ConClientListMesReceived ( CVector<CChannelInfo> vecChanInfo );
void ChatTextReceived ( QString strChatText );
void PingTimeReceived ( int iPingTime );
void CLServerListReceived ( CHostAddress InetAddr,
CVector<CServerInfo> vecServerInfo );
void CLPingTimeWithNumClientsReceived ( CHostAddress InetAddr,
int iPingTime,
int iNumClients );

View file

@ -65,21 +65,6 @@ LED bar: lbr
/* Definitions ****************************************************************/
// The receive socket should be put in a high priority thread to ensure the GUI
// does not effect the stability of the audio stream (e.g. if the GUI is on
// high load because of a table update, the incoming network packets must still
// be put in the jitter buffer with highest priority).
// Since the new code is still in "experimental state", we introduced this macro
// to enable/disable the new code.
//
// I disabled the new code again since it seems not to improve the situation
// much. Maybe things must be tweaked for it to work correctly. Since the more
// threads we have the higher is the risk of software crashes, I will only
// enable the new code on significant performance gain.
//
//#define ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
#undef ENABLE_RECEIVE_SOCKET_IN_SEPARATE_THREAD
// define this macro to get debug output
//#define _DEBUG_
#undef _DEBUG_

View file

@ -447,50 +447,6 @@ CServer::CServer ( const int iNewMaxNumChan,
QObject::connect ( &vecChannels[18], SIGNAL ( MessReadyForSending ( CVector<uint8_t> ) ), this, SLOT ( OnSendProtMessCh18 ( CVector<uint8_t> ) ) );
QObject::connect ( &vecChannels[19], SIGNAL ( MessReadyForSending ( CVector<uint8_t> ) ), this, SLOT ( OnSendProtMessCh19 ( CVector<uint8_t> ) ) );
// a connection less protocol message was detected
QObject::connect ( &vecChannels[0], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh0 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[1], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh1 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[2], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh2 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[3], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh3 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[4], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh4 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[5], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh5 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[6], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh6 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[7], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh7 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[8], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh8 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[9], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh9 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[10], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh10 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[11], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh11 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[12], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh12 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[13], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh13 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[14], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh14 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[15], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh15 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[16], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh16 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[17], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh17 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[18], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh18 ( CVector<uint8_t>, int ) ) );
QObject::connect ( &vecChannels[19], SIGNAL ( DetectedCLMessage ( CVector<uint8_t>, int ) ), this, SLOT ( OnDetCLMessCh19 ( CVector<uint8_t>, int ) ) );
// request jitter buffer size
QObject::connect ( &vecChannels[0], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh0() ) );
QObject::connect ( &vecChannels[1], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh1() ) );
QObject::connect ( &vecChannels[2], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh2() ) );
QObject::connect ( &vecChannels[3], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh3() ) );
QObject::connect ( &vecChannels[4], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh4() ) );
QObject::connect ( &vecChannels[5], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh5() ) );
QObject::connect ( &vecChannels[6], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh6() ) );
QObject::connect ( &vecChannels[7], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh7() ) );
QObject::connect ( &vecChannels[8], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh8() ) );
QObject::connect ( &vecChannels[9], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh9() ) );
QObject::connect ( &vecChannels[10], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh10() ) );
QObject::connect ( &vecChannels[11], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh11() ) );
QObject::connect ( &vecChannels[12], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh12() ) );
QObject::connect ( &vecChannels[13], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh13() ) );
QObject::connect ( &vecChannels[14], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh14() ) );
QObject::connect ( &vecChannels[15], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh15() ) );
QObject::connect ( &vecChannels[16], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh16() ) );
QObject::connect ( &vecChannels[17], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh17() ) );
QObject::connect ( &vecChannels[18], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh18() ) );
QObject::connect ( &vecChannels[19], SIGNAL ( NewConnection() ), this, SLOT ( OnNewConnectionCh19() ) );
// request connected clients list
QObject::connect ( &vecChannels[0], SIGNAL ( ReqConnClientsList() ), this, SLOT ( OnReqConnClientsListCh0() ) );
QObject::connect ( &vecChannels[1], SIGNAL ( ReqConnClientsList() ), this, SLOT ( OnReqConnClientsListCh1() ) );
@ -578,6 +534,11 @@ CServer::CServer ( const int iNewMaxNumChan,
QObject::connect ( &vecChannels[17], SIGNAL ( ServerAutoSockBufSizeChange ( int ) ), this, SLOT ( OnServerAutoSockBufSizeChangeCh17 ( int ) ) );
QObject::connect ( &vecChannels[18], SIGNAL ( ServerAutoSockBufSizeChange ( int ) ), this, SLOT ( OnServerAutoSockBufSizeChangeCh18 ( int ) ) );
QObject::connect ( &vecChannels[19], SIGNAL ( ServerAutoSockBufSizeChange ( int ) ), this, SLOT ( OnServerAutoSockBufSizeChangeCh19 ( int ) ) );
// start the socket (it is important to start the socket after all
// initializations and connections)
Socket.Start();
}
void CServer::OnSendProtMessage ( int iChID, CVector<uint8_t> vecMessage )
@ -587,7 +548,8 @@ void CServer::OnSendProtMessage ( int iChID, CVector<uint8_t> vecMessage )
Socket.SendPacket ( vecMessage, vecChannels[iChID].GetAddress() );
}
void CServer::OnNewConnection ( int iChID )
void CServer::OnNewConnection ( int iChID,
CHostAddress RecHostAddr )
{
// on a new connection we query the network transport properties for the
// audio packets (to use the correct network block size and audio
@ -600,6 +562,51 @@ void CServer::OnNewConnection ( int iChID )
// reached a state where this did not happen because of network trouble,
// client or server thinks that the connection was still active, etc.)
vecChannels[iChID].CreateReqJitBufMes();
// logging of new connected channel
Logging.AddNewConnection ( RecHostAddr.InetAddr );
// A new client connected to the server, the channel list
// at all clients have to be updated. This is done by sending
// a channel name request to the client which causes a channel
// name message to be transmitted to the server. If the server
// receives this message, the channel list will be automatically
// updated (implicitely).
// To make sure the protocol message is transmitted, the channel
// first has to be marked as connected.
//
// Usually it is not required to send the channel list to the
// client currently connecting since it automatically requests
// the channel list on a new connection (as a result, he will
// usually get the list twice which has no impact on functionality
// but will only increase the network load a tiny little bit). But
// in case the client thinks he is still connected but the server
// was restartet, it is important that we send the channel list
// at this place.
vecChannels[iChID].ResetTimeOutCounter();
vecChannels[iChID].CreateReqChanInfoMes();
// COMPATIBILITY ISSUE
// since old versions of the software did not implement the channel name
// request message, we have to explicitely send the channel list here
CreateAndSendChanListForAllConChannels();
// send welcome message (if enabled)
if ( !strWelcomeMessage.isEmpty() )
{
// create formated server welcome message and send it just to
// the client which just connected to the server
const QString strWelcomeMessageFormated =
"<b>Server Welcome Message:</b> " + strWelcomeMessage;
vecChannels[iChID].CreateChatTextMes ( strWelcomeMessageFormated );
}
}
void CServer::OnServerFull ( CHostAddress RecHostAddr )
{
// inform the calling client that no channel is free
ConnLessProtocol.CreateCLServerFullMes ( RecHostAddr );
}
void CServer::OnSendCLProtMessage ( CHostAddress InetAddr,
@ -610,15 +617,14 @@ void CServer::OnSendCLProtMessage ( CHostAddress InetAddr,
Socket.SendPacket ( vecMessage, InetAddr );
}
void CServer::OnDetCLMess ( const CVector<uint8_t>& vecbyMesBodyData,
const int iRecID,
const CHostAddress& InetAddr )
void CServer::OnProtcolCLMessageReceived ( int iRecID,
CVector<uint8_t> vecbyMesBodyData,
CHostAddress RecHostAddr )
{
// this is a special case: we received a connection less message but we are
// in a connection
// connection less messages are always processed
ConnLessProtocol.ParseConnectionLessMessageBody ( vecbyMesBodyData,
iRecID,
InetAddr );
RecHostAddr );
}
void CServer::OnCLDisconnection ( CHostAddress InetAddr )
@ -835,7 +841,7 @@ void CServer::OnTimer()
// Process data ------------------------------------------------------------
// Check if at least one client is connected. If not, stop server until
// one client is connected.
if ( iNumClients != 0 )
if ( iNumClients > 0 )
{
for ( int i = 0; i < iNumClients; i++ )
{
@ -1214,132 +1220,92 @@ int CServer::FindChannel ( const CHostAddress& CheckAddr )
return INVALID_CHANNEL_ID;
}
bool CServer::PutData ( const CVector<uint8_t>& vecbyRecBuf,
const int iNumBytesRead,
const CHostAddress& HostAdr )
void CServer::OnProtcolMessageReceived ( int iRecCounter,
int iRecID,
CVector<uint8_t> vecbyMesBodyData,
CHostAddress RecHostAddr )
{
bool bChanOK = true; // init with ok, might be overwritten
bool bNewChannelReserved = false;
bool bIsNotEvaluatedProtocolMessage = false;
Mutex.lock();
{
// find the channel with the received address
const int iCurChanID = FindChannel ( RecHostAddr );
// if the channel exists, apply the protocol message to the channel
if ( iCurChanID != INVALID_CHANNEL_ID )
{
vecChannels[iCurChanID].PutProtcolData ( iRecCounter,
iRecID,
vecbyMesBodyData,
RecHostAddr );
}
}
Mutex.unlock();
}
bool CServer::PutAudioData ( const CVector<uint8_t>& vecbyRecBuf,
const int iNumBytesRead,
const CHostAddress& HostAdr,
int& iCurChanID )
{
bool bNewConnection = false; // init return value
bool bChanOK = true; // init with ok, might be overwritten
Mutex.lock();
{
// Get channel ID ------------------------------------------------------
// check address
int iCurChanID = FindChannel ( HostAdr );
iCurChanID = FindChannel ( HostAdr );
if ( iCurChanID == INVALID_CHANNEL_ID )
{
// this is a new client, we then first check if this is a connection
// less message before we create a new official channel
if ( ConnLessProtocol.ParseConnectionLessMessageWithFrame ( vecbyRecBuf,
iNumBytesRead,
HostAdr ) )
// a new client is calling, look for free channel
iCurChanID = GetFreeChan();
if ( iCurChanID != INVALID_CHANNEL_ID )
{
// a new client is calling, look for free channel
iCurChanID = GetFreeChan();
// initialize current channel by storing the calling host
// address
vecChannels[iCurChanID].SetAddress ( HostAdr );
if ( iCurChanID != INVALID_CHANNEL_ID )
// reset channel info
vecChannels[iCurChanID].ResetInfo();
// reset the channel gains of current channel, at the same
// time reset gains of this channel ID for all other channels
for ( int i = 0; i < iMaxNumChannels; i++ )
{
// initialize current channel by storing the calling host
// address
vecChannels[iCurChanID].SetAddress ( HostAdr );
vecChannels[iCurChanID].SetGain ( i, (double) 1.0 );
// reset channel info
vecChannels[iCurChanID].ResetInfo();
// reset the channel gains of current channel, at the same
// time reset gains of this channel ID for all other channels
for ( int i = 0; i < iMaxNumChannels; i++ )
{
vecChannels[iCurChanID].SetGain ( i, (double) 1.0 );
// other channels (we do not distinguish the case if
// i == iCurChanID for simplicity)
vecChannels[i].SetGain ( iCurChanID, (double) 1.0 );
}
// set flag for new reserved channel
bNewChannelReserved = true;
}
else
{
// no free channel available
bChanOK = false;
// create and send "server full" message
ConnLessProtocol.CreateCLServerFullMes ( HostAdr );
// other channels (we do not distinguish the case if
// i == iCurChanID for simplicity)
vecChannels[i].SetGain ( iCurChanID, (double) 1.0 );
}
}
else
{
// this was a connection less protocol message, return according
// state
// no free channel available
bChanOK = false;
}
}
// Put received data in jitter buffer ----------------------------------
// Put received audio data in jitter buffer ----------------------------
if ( bChanOK )
{
// put packet in socket buffer
if ( vecChannels[iCurChanID].PutData ( vecbyRecBuf,
iNumBytesRead ) == PS_PROT_OK_MESS_NOT_EVALUATED )
if ( vecChannels[iCurChanID].PutAudioData ( vecbyRecBuf,
iNumBytesRead,
HostAdr ) == PS_NEW_CONNECTION )
{
// set flag
bIsNotEvaluatedProtocolMessage = true;
}
}
// act on new channel connection
if ( bNewChannelReserved && ( !bIsNotEvaluatedProtocolMessage ) )
{
// logging of new connected channel
Logging.AddNewConnection ( HostAdr.InetAddr );
// A new client connected to the server, the channel list
// at all clients have to be updated. This is done by sending
// a channel name request to the client which causes a channel
// name message to be transmitted to the server. If the server
// receives this message, the channel list will be automatically
// updated (implicitely).
// To make sure the protocol message is transmitted, the channel
// first has to be marked as connected.
//
// Usually it is not required to send the channel list to the
// client currently connecting since it automatically requests
// the channel list on a new connection (as a result, he will
// usually get the list twice which has no impact on functionality
// but will only increase the network load a tiny little bit). But
// in case the client thinks he is still connected but the server
// was restartet, it is important that we send the channel list
// at this place.
vecChannels[iCurChanID].ResetTimeOutCounter();
vecChannels[iCurChanID].CreateReqChanInfoMes();
// COMPATIBILITY ISSUE
// since old versions of the software did not implement the channel name
// request message, we have to explicitely send the channel list here
CreateAndSendChanListForAllConChannels();
// send welcome message (if enabled)
if ( !strWelcomeMessage.isEmpty() )
{
// create formated server welcome message and send it just to
// the client which just connected to the server
const QString strWelcomeMessageFormated =
"<b>Server Welcome Message:</b> " + strWelcomeMessage;
vecChannels[iCurChanID].CreateChatTextMes ( strWelcomeMessageFormated );
// in case we have a new connection return this information
bNewConnection = true;
}
}
}
Mutex.unlock();
// we do not want the server to be started on a protocol message but only on
// an audio packet -> consider "bIsNotEvaluatedProtocolMessage", too
return bChanOK && ( !bIsNotEvaluatedProtocolMessage );
// return the state if a new connection was happening
return bNewConnection;
}
void CServer::GetConCliParam ( CVector<CHostAddress>& vecHostAddresses,