source-engine/utils/vmpi/vmpi.cpp
FluorescentCIAAfricanAmerican 3bf9df6b27 1
2020-04-22 12:56:21 -04:00

2479 lines
63 KiB
C++

//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose: This module implements the subset of MPI that VRAD and VVIS use.
//
// $NoKeywords: $
//=============================================================================//
#include <windows.h>
#include <io.h>
#include <conio.h>
#include <sys/stat.h>
#include <stdio.h>
#include <direct.h>
#include "iphelpers.h"
#include "utlvector.h"
#include "utllinkedlist.h"
#include "vmpi.h"
#include "bitbuf.h"
#include "tier1/strtools.h"
#include "threadhelpers.h"
#include "IThreadedTCPSocket.h"
#include "vstdlib/random.h"
#include "vmpi_distribute_work.h"
#include "filesystem.h"
#include "checksum_md5.h"
#include "tslist.h"
#include "tier0/icommandline.h"
#define DEFAULT_MAX_WORKERS 32 // Unless they specify -mpi_MaxWorkers, it will stop accepting workers after it gets this many.
int g_nMaxWorkerCount = DEFAULT_MAX_WORKERS;
#define VMPI_INTERNAL_PACKET_ID 27
#define VMPI_INTERNAL_SUBPACKET_MACHINE_NAME 1
#define VMPI_INTERNAL_SUBPACKET_COMMAND_LINE 2
#define VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE 3
#define VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET 4
#define VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE 5
#define VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME 6
typedef CUtlVector<char> PersistentPacket;
CCriticalSection g_PersistentPacketsCS;
CUtlLinkedList<PersistentPacket*> g_PersistentPackets;
// Command-line parameters list.
#define VMPI_PARAM( paramName, paramFlags, helpText ) {paramName, paramFlags, "-"#paramName, helpText},
class CVMPIParam
{
public:
EVMPICmdLineParam m_eParam;
int m_ParamFlags;
const char *m_pName;
const char *m_pHelpText;
};
static CVMPIParam g_VMPIParams[] =
{
{k_eVMPICmdLineParam_FirstParam, 0, "k_eVMPICmdLineParam_FirstParam", "unused"},
{k_eVMPICmdLineParam_VMPIParam, 0, "mpi", "Enable VMPI."},
#include "vmpi_parameters.h"
};
#undef VMPI_PARAM
// ---------------------------------------------------------------------------------------- //
// Globals.
// ---------------------------------------------------------------------------------------- //
class CVMPIConnection;
// Used by -mpi_AutoRestart.
CUtlVector<char*> g_OriginalCommandLineParameters;
// This queues up all the incoming VMPI messages.
CCriticalSection g_VMPIMessagesCS;
CUtlLinkedList< CTCPPacket*, int > g_VMPIMessages;
CEvent g_VMPIMessagesEvent; // This is set when there are messages in the queue.
// These are used to notify the main thread when some socket had OnError() called on it.
CUtlLinkedList<CVMPIConnection*,int> g_ErrorSockets;
CEvent g_ErrorSocketsEvent;
CCriticalSection g_ErrorSocketsCS;
bool g_bTimingWaitDone = false;
bool g_bGroupPackets = false;
#define MAX_VMPI_CONNECTIONS 512
CVMPIConnection *g_Connections[MAX_VMPI_CONNECTIONS];
int g_nConnections = 0;
CCriticalSection g_ConnectionsCS;
// If true, then it will set certain thread priorities low.
bool g_bSetThreadPriorities = true;
VMPIDispatchFn g_VMPIDispatch[MAX_VMPI_PACKET_IDS];
CTSList<MessageBuffer *> g_DispatchBuffers;
VMPIRunMode g_VMPIRunMode = VMPI_RUN_NETWORKED;
VMPIFileSystemMode g_VMPIFileSystemMode = VMPI_FILESYSTEM_TCP;
static char g_GroupedPacketHeader[] = { VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET };
static unsigned long g_LastFlushGroupedPacketsTime = 0;
// Set to true if we're running under the SDK (i.e. vmpi_transfer.exe is not found).
bool g_bVMPISDKMode = false;
bool g_bVMPISDKModeSet = false; // If g_bVMPISDKMode has not been set, then VMPI_IsSDKMode just looks for VMPI_Transfer (and doesn't check the command line).
int g_nBytesSent = 0;
int g_nMessagesSent = 0;
int g_nBytesReceived = 0;
int g_nMessagesReceived = 0;
int g_nMulticastBytesSent = 0;
int g_nMulticastBytesReceived = 0;
CUtlLinkedList<VMPI_Disconnect_Handler,int> g_DisconnectHandlers;
bool g_bUseMPI = false;
int g_iVMPIVerboseLevel = 0;
bool g_bMPIMaster = false;
bool g_bMPI_Stats = false;
bool g_bMPI_StatsTextOutput = false;
char g_CurrentStageString[128] = "";
CCriticalSection g_CurrentStageCS;
char g_MasterExeName[MAX_PATH];
bool g_bReceivedMasterExeName = false;
// Change our window text.
HINSTANCE g_hKernel32DLL = NULL;
typedef HWND (*GetConsoleWndFn)();
GetConsoleWndFn g_pConsoleWndFn = NULL;
// ---------------------------------------------------------------------------------------- //
// Classes.
// ---------------------------------------------------------------------------------------- //
// This class is used while discovering what files the workers need.
class CDependencyInfo
{
public:
class CDependencyFile
{
public:
char m_Name[MAX_PATH];
};
// This is the directory where the dependency files live (i.e. all the binaries that the workers need to run the job).
char m_DependencyFilesDir[MAX_PATH];
// "vrad.exe", "vvis.exe", etc.
char m_OriginalExeFilename[MAX_PATH];
CUtlVector<CDependencyFile*> m_Files;
public:
CDependencyFile* FindFile( const char *pFilename )
{
for ( int i=0; i < m_Files.Count(); i++ )
{
if ( stricmp( pFilename, m_Files[i]->m_Name ) == 0 )
return m_Files[i];
}
return NULL;
}
};
class CVMPIConnection : public ITCPSocketHandler
{
public:
CVMPIConnection( int iConnection )
{
m_iConnection = iConnection;
m_pSocket = NULL;
m_bIsAService = false;
char str[512];
Q_snprintf( str, sizeof( str ), "%d", iConnection );
SetMachineName( str );
m_JobWorkerID = 0xFFFFFFFF;
m_bNameSet = false;
}
~CVMPIConnection()
{
if ( m_pSocket )
m_pSocket->Release();
}
public:
void HandleDisconnect()
{
if ( m_pSocket )
{
// Copy out the error string.
CCriticalSectionLock csLock( &g_ErrorSocketsCS );
csLock.Lock();
char str[512];
Q_strncpy( str, m_ErrorString.Base(), sizeof( str ) );
csLock.Unlock();
// Tell the app.
FOR_EACH_LL( g_DisconnectHandlers, i )
g_DisconnectHandlers[i]( m_iConnection, str );
// Free our socket.
m_pSocket->Release();
m_pSocket = NULL;
}
}
IThreadedTCPSocket* GetSocket()
{
return m_pSocket;
}
void SetMachineName( const char *pName )
{
m_MachineName.CopyArray( pName, strlen( pName ) + 1 );
m_bNameSet = true;
}
const char* GetMachineName()
{
return m_MachineName.Base();
}
bool HasMachineNameBeenSet()
{
return m_bNameSet;
}
// ITCPSocketHandler implementation (thread-safe stuff).
public:
virtual void Init( IThreadedTCPSocket *pSocket )
{
m_pSocket = pSocket;
}
virtual void OnPacketReceived( CTCPPacket *pPacket )
{
// Set who this message came from.
pPacket->SetUserData( m_iConnection );
Assert( m_iConnection >= 0 && m_iConnection < 2048 );
// Store it in the global list.
CCriticalSectionLock csLock( &g_VMPIMessagesCS );
csLock.Lock();
g_VMPIMessages.AddToTail( pPacket );
if ( g_VMPIMessages.Count() == 1 )
g_VMPIMessagesEvent.SetEvent();
}
virtual void OnError( int errorCode, const char *pErrorString )
{
if ( !g_bMPIMaster )
{
Msg( "%s - CVMPIConnection::OnError( %s )\n", GetMachineName(), pErrorString );
}
CCriticalSectionLock csLock( &g_ErrorSocketsCS );
csLock.Lock();
m_ErrorString.CopyArray( pErrorString, strlen( pErrorString ) + 1 );
g_ErrorSockets.AddToTail( this );
// Notify the main thread that a socket is in trouble!
g_ErrorSocketsEvent.SetEvent();
// Make sure the main thread picks up this error soon.
InterlockedIncrement( &m_ErrorSignal );
}
public:
unsigned long m_JobWorkerID;
bool m_bIsAService; // If true, then this is just a service getting the files. Don't count it as an active worker.
CUtlVector<int> m_GroupedChunkLengths;
CUtlVector<void*> m_GroupedChunks;
private:
CUtlVector<char> m_MachineName;
CUtlVector<char> m_ErrorString;
long m_ErrorSignal;
int m_iConnection;
IThreadedTCPSocket *m_pSocket;
bool m_bNameSet;
};
class CVMPIConnectionCreator : public IHandlerCreator
{
public:
virtual ITCPSocketHandler* CreateNewHandler()
{
Assert( g_nConnections < MAX_VMPI_CONNECTIONS );
CVMPIConnection *pRet = new CVMPIConnection( g_nConnections );
g_Connections[g_nConnections++] = pRet;
return pRet;
}
};
// ---------------------------------------------------------------------------------------- //
// Helpers.
// ---------------------------------------------------------------------------------------- //
const char* VMPI_FindArg( int argc, char **argv, const char *pName, const char *pDefault )
{
for ( int i=0; i < argc; i++ )
{
if ( stricmp( argv[i], pName ) == 0 )
{
if ( (i+1) < argc )
return argv[i+1];
else
return pDefault;
}
}
return NULL;
}
void ParseOptions( int argc, char **argv )
{
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_NoTimeout ) ) )
ThreadedTCP_EnableTimeouts( false );
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_DontSetThreadPriorities ) ) )
{
Msg( "%s found.\n", VMPI_GetParamString( mpi_DontSetThreadPriorities ) );
g_bSetThreadPriorities = false;
ThreadedTCP_SetTCPSocketThreadPriorities( false );
}
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_GroupPackets ) ) )
{
Msg( "%s found.\n", VMPI_GetParamString( mpi_GroupPackets ) );
g_bGroupPackets = true;
}
const char *pTransmitRate = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_FileTransmitRate ), "1" );
if ( pTransmitRate )
{
extern int MULTICAST_TRANSMIT_RATE;
MULTICAST_TRANSMIT_RATE = atoi( pTransmitRate ) * 1024;
}
const char *pVerbose = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Verbose ), "1" );
if ( pVerbose )
{
if ( pVerbose[0] == '1' )
g_iVMPIVerboseLevel = 1;
else if ( pVerbose[0] == '2' )
g_iVMPIVerboseLevel = 2;
}
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Stats ) ) )
g_bMPI_Stats = true;
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Stats_TextOutput ) ) )
g_bMPI_StatsTextOutput = true;
}
void SetupDependencyFilename( CDependencyInfo *pInfo, const char *pPatchDirectory )
{
char baseExeFilename[512];
if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) )
Error( "GetModuleFileName failed." );
// If they're in patch mode, then the dependency files come out of a directory they've passed in.
// Otherwise, the files come from the same exe dir we're in (like c:\valve\game\bin).
if ( pPatchDirectory )
{
V_strncpy( pInfo->m_DependencyFilesDir, pPatchDirectory, sizeof( pInfo->m_DependencyFilesDir ) );
}
else
{
V_strncpy( pInfo->m_DependencyFilesDir, baseExeFilename, sizeof( pInfo->m_DependencyFilesDir ) );
V_StripLastDir( pInfo->m_DependencyFilesDir, sizeof( pInfo->m_DependencyFilesDir ) );
}
// Get the exe filename.
V_strncpy( pInfo->m_OriginalExeFilename, V_UnqualifiedFileName( baseExeFilename ), sizeof( pInfo->m_OriginalExeFilename ) );
}
bool ReadString( char *pOut, int maxLen, FILE *fp )
{
if ( !fgets( pOut, maxLen, fp ) || pOut[0] == 0 )
return false;
int len = strlen( pOut );
if ( pOut[len - 1] == '\n' )
pOut[len - 1] = 0;
return true;
}
void ParseDependencyFile( CDependencyInfo *pInfo, const char *pDepFilename )
{
FILE *fp = fopen( pDepFilename, "rt" );
if ( !fp )
Error( "Can't find %s.", pDepFilename );
const char *pOptionalPrefix = "optional ";
char tempStr[MAX_PATH];
while ( ReadString( tempStr, sizeof( tempStr ), fp ) )
{
CDependencyInfo::CDependencyFile *pFile = new CDependencyInfo::CDependencyFile;
bool bOptional = false;
if ( strstr( tempStr, "optional " ) == tempStr )
{
bOptional = true;
Q_strncpy( pFile->m_Name, tempStr + strlen( pOptionalPrefix ), sizeof( pFile->m_Name ) );
}
else
{
Q_strncpy( pFile->m_Name, tempStr, sizeof( pFile->m_Name ) );
}
// Now get the file info.
char fullFilename[MAX_PATH];
V_ComposeFileName( pInfo->m_DependencyFilesDir, pFile->m_Name, fullFilename, sizeof( fullFilename ) );
if ( _access( fullFilename, 0 ) == 0 )
{
pInfo->m_Files.AddToTail( pFile );
}
else
{
delete pFile;
if ( !bOptional )
Error( "Can't find %s (listed in %s).", fullFilename, pDepFilename );
}
}
fclose( fp );
}
void SetupDependenciesForPatch( CDependencyInfo *pInfo, const char *pPatchDirectory )
{
char searchStr[MAX_PATH];
V_ComposeFileName( pPatchDirectory, "*.*", searchStr, sizeof( searchStr ) );
_finddata_t data;
long handle = _findfirst( searchStr, &data );
if ( handle != -1 )
{
do
{
if ( data.name[0] == '.' || (data.attrib & _A_SUBDIR) != 0 )
continue;
CDependencyInfo::CDependencyFile *pFile = new CDependencyInfo::CDependencyFile;
V_strncpy( pFile->m_Name, data.name, sizeof( pFile->m_Name ) );
pInfo->m_Files.AddToTail( pFile );
} while( _findnext( handle, &data ) == 0 );
_findclose( handle );
}
}
void SetupDependencyInfo( CDependencyInfo *pInfo, const char *pDependencyFilename, bool bPatchMode )
{
if ( bPatchMode )
{
const char *pPatchDirectory = pDependencyFilename;
SetupDependencyFilename( pInfo, pPatchDirectory );
SetupDependenciesForPatch( pInfo, pPatchDirectory );
}
else
{
SetupDependencyFilename( pInfo, NULL );
// Parse the dependency file.
char depFilename[MAX_PATH];
V_ComposeFileName( pInfo->m_DependencyFilesDir, pDependencyFilename, depFilename, sizeof( depFilename ) );
ParseDependencyFile( pInfo, depFilename );
}
}
int GetCurMicrosecondsAndSleep( int sleepLen )
{
Sleep( sleepLen );
CCycleCount cnt;
cnt.Sample();
return cnt.GetMicroseconds();
}
void CountActiveConnections( int *nRegularWorkers, int *nServiceDownloaders )
{
*nRegularWorkers = *nServiceDownloaders = 0;
int nTotalConnections = g_nConnections;
for ( int i=0; i < nTotalConnections; i++ )
{
if ( VMPI_IsProcConnected( i ) )
{
if ( VMPI_IsProcAService( i ) )
(*nServiceDownloaders)++;
else
(*nRegularWorkers)++;
}
}
}
// In this function, we update the window text to tell how many active workers there are.
void UpdateActiveConnectionsText()
{
if ( !g_bMPIMaster || !g_pConsoleWndFn )
return;
HWND hWnd = g_pConsoleWndFn();
if ( !hWnd )
return;
int nRegularWorkers, nDownloaders;
CountActiveConnections( &nRegularWorkers, &nDownloaders );
char str[512];
if ( g_bVMPISDKMode )
{
V_snprintf( str, sizeof( str ), "VMPI (SDK) - Workers: %d", nRegularWorkers );
}
else
{
V_snprintf( str, sizeof( str ), "VMPI - Workers: %d, Downloaders: %d", nRegularWorkers, nDownloaders );
}
SetWindowText( hWnd, str );
}
void VMPI_SendMachineNameTo( int iProc )
{
const char *pMyName = VMPI_GetLocalMachineName();
unsigned char packetData[512];
packetData[0] = VMPI_INTERNAL_PACKET_ID;
packetData[1] = VMPI_INTERNAL_SUBPACKET_MACHINE_NAME;
Q_strncpy( (char*)&packetData[2], pMyName, sizeof( packetData ) - 2 );
VMPI_SendData( packetData, 2 + strlen( pMyName ) + 1, iProc );
}
static CVMPIConnection* FindConnectionBySocket( IThreadedTCPSocket *pSocket, bool bLockConnections )
{
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
if ( bLockConnections )
connectionsLock.Lock();
for ( int i=0; i < g_nConnections; i++ )
if ( g_Connections[i]->GetSocket() == pSocket )
return g_Connections[i];
return NULL;
}
static char* CopyString( const char *pStr )
{
int len = V_strlen( pStr ) + 1;
char *pArg = new char[len];
Q_strncpy( pArg, pStr, len );
return pArg;
}
// ---------------------------------------------------------------------------------------- //
// Internal VMPI dispatch..
// ---------------------------------------------------------------------------------------- //
void VMPI_SetMachineName( int iProc, const char *pName );
CUtlVector<char*> g_WorkerCommandLine;
bool g_bReceivedWorkerCommandLine = false;
bool VMPI_InternalDispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID )
{
if ( pBuf->getLen() >= 2 )
{
if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_MACHINE_NAME )
{
if ( pBuf->getLen() >= 3 )
{
VMPI_SetMachineName( iSource, &pBuf->data[2] );
return true;
}
}
else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE )
{
if ( !VMPI_IsSDKMode() )
{
Warning( "Worker %d is running in SDK mode (and the master is not)!\n", iSource );
}
return true;
}
else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_COMMAND_LINE )
{
pBuf->setOffset( 2 );
int nArgs;
pBuf->read( &nArgs, sizeof( nArgs ) );
for ( int i=0; i < nArgs; i++ )
{
char str[4096];
if ( pBuf->ReadString( str, sizeof( str ) ) == -1 )
Error( "Error in ReadString() while reading command line." );
g_WorkerCommandLine.AddToTail( CopyString( str ) );
}
g_bReceivedWorkerCommandLine = true;
return true;
}
else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME )
{
pBuf->setOffset( 2 );
if ( pBuf->ReadString( g_MasterExeName, sizeof( g_MasterExeName ) ) == -1 )
Error( "Error in ReadString() while reading VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME." );
g_bReceivedMasterExeName = true;
return true;
}
else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE )
{
g_bTimingWaitDone = true;
return true;
}
}
return false;
}
CDispatchReg g_VMPIInternalDispatchReg( VMPI_INTERNAL_PACKET_ID, VMPI_InternalDispatchFn ); // register to handle the messages we want
void VMPI_SendCommandLine( int argc, char **argv )
{
MessageBuffer mb;
char cPacketHeader[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_COMMAND_LINE};
mb.write( cPacketHeader, sizeof( cPacketHeader ) );
mb.write( &argc, sizeof( argc ) );
for ( int i=0; i < argc; i++ )
mb.WriteString( argv[i] );
VMPI_SendData( mb.data, mb.getLen(), VMPI_PERSISTENT );
}
void VMPI_ReceiveCommandLine()
{
// For verification purposes, tell the master we're trying to get the command line.
unsigned char chData[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE};
VMPI_SendData( chData, sizeof( chData ), VMPI_MASTER_ID );
double startTime = Plat_FloatTime();
while ( !g_bReceivedWorkerCommandLine )
{
if ( Plat_FloatTime() - startTime > 30 )
Error( "VMPI_ReceiveCommandLine: timeout. Is the master running in SDK mode?" );
VMPI_DispatchNextMessage( 10 * 1000 );
}
}
void VMPI_SendExeName()
{
MessageBuffer mb;
char cPacketHeader[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME};
mb.write( cPacketHeader, sizeof( cPacketHeader ) );
char baseExeFilename[MAX_PATH], fileBase[MAX_PATH];
if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) )
Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." );
V_FileBase( baseExeFilename, fileBase, sizeof( fileBase ) );
mb.WriteString( fileBase );
VMPI_SendData( mb.data, mb.getLen(), VMPI_PERSISTENT );
}
void VMPI_ReceiveExeName()
{
double startTime = Plat_FloatTime();
while ( !g_bReceivedMasterExeName )
{
if ( Plat_FloatTime() - startTime > 30 )
Error( "VMPI_ReceiveExeName: timeout." );
VMPI_DispatchNextMessage( 10 * 1000 );
}
// Now compare the exe name we got with our own.
char baseExeFilename[MAX_PATH], fileBase[MAX_PATH];
if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) )
Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." );
// Unless we're a vmpi_transfer.. vmpi_transfer can always connect.
V_FileBase( baseExeFilename, fileBase, sizeof( fileBase ) );
if ( V_stricmp( fileBase, "vmpi_transfer" ) != 0 )
{
if ( V_stricmp( fileBase, g_MasterExeName ) != 0 )
{
Error( "VMPI_ReceiveExeName: mismatched exe names (master: %s, me: %s).\nThis usually just means the master finished"
" a job like vvis really fast and started a vrad immediately, and an old vvis worker connected to the new vrad job.",
g_MasterExeName, fileBase );
}
}
}
// ---------------------------------------------------------------------------------------- //
// CMasterBroadcaster
// This class broadcasts messages looking for workers. The app updates it as often as possible
// and it'll add workers as necessary.
// ---------------------------------------------------------------------------------------- //
#define MASTER_BROADCAST_INTERVAL 600 // Send every N milliseconds.
class CMasterBroadcaster
{
public:
CMasterBroadcaster();
~CMasterBroadcaster();
bool Init( int argc, char **argv, const char *pDependencyFilename, int nMaxWorkers, VMPIRunMode runMode, bool bPatchMode );
void Term();
// What port is it listening on?
int GetListenPort() const;
// These can be used to allow more workers on or filter who's able to connect
int GetMaxWorkers() const;
void IncreaseMaxWorkers( int count );
void SetPassword( const char *pPassword );
void SetNoTimeoutOption();
private:
void GetPatchWorkerList( int argc, char **argv );
private:
class CMasterBroadcastInfo
{
public:
int m_JobID[4];
char m_Password[256];
char m_WorkerExeFilename[MAX_PATH];
CUtlVector<char*> m_Args;
char m_PatchVersion[32]; // 0 if not patching.
bool m_bForcePatch;
};
void ThreadFn();
static DWORD WINAPI StaticThreadFn( LPVOID lpParameter );
bool Update();
void BuildBroadcastPacket( bf_write &buf );
private:
ITCPConnectSocket *m_pListenSocket;
ITCPConnectSocket *m_pDownloaderListenSocket;
ISocket *m_pSocket;
DWORD m_LastSendTime;
CMasterBroadcastInfo m_BroadcastInfo;
CUtlVector<CIPAddr> m_PatchWorkerIPs; // If in patch mode, these are the IPs we send the job request to (instead of broadcasting).
bool m_bPatching;
CVMPIConnectionCreator m_ConnectionCreator;
int m_nMaxWorkers;
HANDLE m_hThread;
CEvent m_hShutdownEvent;
CEvent m_hShutdownReply;
VMPIRunMode m_RunMode;
int m_iListenPort;
int m_iDownloaderListenPort;
};
CMasterBroadcaster::CMasterBroadcaster()
{
m_pListenSocket = NULL;
m_pDownloaderListenSocket = NULL;
m_pSocket = NULL;
m_iListenPort = -1;
m_iDownloaderListenPort = -1;
}
CMasterBroadcaster::~CMasterBroadcaster()
{
Term();
}
void CMasterBroadcaster::GetPatchWorkerList( int argc, char **argv )
{
m_PatchWorkerIPs.Purge();
for ( int i=0; i < argc-1; i++ )
{
if ( V_stricmp( argv[i], "-mpi_PatchWorkers" ) == 0 )
{
int workerCount = atoi( argv[i+1] );
for ( int iWorker=0; iWorker < workerCount; iWorker++ )
{
int iArg = i+2 + iWorker;
if ( iArg >= argc )
Error( "-mpi_PatchWorkers: %d specified for count, but not enough IPs following.\n", workerCount );
int a, b, c, d;
const char *pArg = argv[iArg];
sscanf( pArg, "%d.%d.%d.%d", &a, &b, &c, &d );
CIPAddr addr;
addr.Init( a, b, c, d, 0 );
m_PatchWorkerIPs.AddToTail( addr );
}
return;
}
}
}
bool CMasterBroadcaster::Init(
int argc,
char **argv,
const char *pDependencyFilename,
int nMaxWorkers,
VMPIRunMode runMode,
bool bPatchMode )
{
m_RunMode = runMode;
m_nMaxWorkers = nMaxWorkers;
// Open the file that tells us which binaries we depend on.
CDependencyInfo dependencyInfo;
if ( m_RunMode == VMPI_RUN_NETWORKED && !g_bVMPISDKMode )
{
SetupDependencyInfo( &dependencyInfo, pDependencyFilename, bPatchMode );
}
m_pListenSocket = NULL;
m_pDownloaderListenSocket = NULL;
const char *pPortStr = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Port ) );
if ( pPortStr )
{
m_iListenPort = atoi( pPortStr );
m_iDownloaderListenPort = m_iListenPort + 1;
m_pListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iListenPort );
if ( !g_bVMPISDKMode )
{
m_pDownloaderListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iDownloaderListenPort );
}
}
else
{
// Create a socket to listen on.
CCycleCount cnt;
cnt.Sample();
int iTime = (int)cnt.GetMicroseconds();
srand( (unsigned)iTime );
for ( int iTest=VMPI_MASTER_FIRST_PORT; iTest <= VMPI_MASTER_LAST_PORT; iTest++ )
{
m_iListenPort = iTest;
m_pListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iListenPort );
if ( m_pListenSocket )
break;
}
// No need to create the downloader in SDK mode.
if ( m_pListenSocket && !g_bVMPISDKMode )
{
for ( int iTest=m_iListenPort+1; iTest <= VMPI_MASTER_LAST_PORT; iTest++ )
{
m_iDownloaderListenPort = iTest;
if ( m_iDownloaderListenPort == m_iListenPort )
continue;
m_pDownloaderListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iDownloaderListenPort );
if ( m_pDownloaderListenSocket )
break;
}
}
}
if ( !m_pListenSocket || (!g_bVMPISDKMode && !m_pDownloaderListenSocket) )
{
Error( "Can't bind a listen socket in port range [%d, %d].", VMPI_MASTER_PORT_FIRST, VMPI_MASTER_PORT_LAST );
}
// Create a socket to broadcast from unless we're in the SDK in which case we don't broadcast.
m_bPatching = false;
if ( m_RunMode == VMPI_RUN_NETWORKED && !g_bVMPISDKMode )
{
m_pSocket = CreateIPSocket();
if ( !m_pSocket->BindToAny( 0 ) )
Error( "MPI_Init_Master: can't bind a socket" );
m_BroadcastInfo.m_bForcePatch = false;
if ( bPatchMode )
{
m_bPatching = true;
if ( VMPI_FindArg( argc, argv, "-mpi_ForcePatch", NULL ) )
m_BroadcastInfo.m_bForcePatch = true;
const char *pArg = VMPI_FindArg( argc, argv, "-mpi_PatchVersion", "0" );
float iPatchVersion = atof( pArg );
if ( iPatchVersion <= 0 || iPatchVersion >= ((1 << 15) - 1) )
{
Error( "-mpi_PatchVersion <val> - val must be between 1.0 and 32767.0" );
}
V_strncpy( m_BroadcastInfo.m_PatchVersion, pArg, sizeof( m_BroadcastInfo.m_PatchVersion ) );
}
else
{
m_BroadcastInfo.m_PatchVersion[0] = 0;
}
// Come up with a unique job ID.
m_BroadcastInfo.m_JobID[0] = GetCurMicrosecondsAndSleep( 1 );
m_BroadcastInfo.m_JobID[1] = GetCurMicrosecondsAndSleep( 1 );
m_BroadcastInfo.m_JobID[2] = GetCurMicrosecondsAndSleep( 1 );
m_BroadcastInfo.m_JobID[3] = GetCurMicrosecondsAndSleep( 1 );
const char *pPassword = VMPI_FindArg( argc, argv, "-mpi_pw", "" );
Q_strncpy( m_BroadcastInfo.m_Password, pPassword ? pPassword : "", sizeof( m_BroadcastInfo.m_Password ) );
Q_strncpy( m_BroadcastInfo.m_WorkerExeFilename, dependencyInfo.m_OriginalExeFilename, sizeof( m_BroadcastInfo.m_WorkerExeFilename ) );
// Store the command-line args.
m_BroadcastInfo.m_Args.Purge();
for ( int i=1; i < argc; i++ )
{
m_BroadcastInfo.m_Args.AddToTail( CopyString( argv[i] ) );
}
// 0th arg is the exe name.
m_BroadcastInfo.m_Args.InsertBefore( 0, CopyString( m_BroadcastInfo.m_WorkerExeFilename ) );
// Now add arguments for each file they need to transmit. The service will use this to get all the files from the master before it starts the app.
for ( int i=0; i < dependencyInfo.m_Files.Count(); i++ )
{
m_BroadcastInfo.m_Args.InsertAfter( 0, "-mpi_file" );
m_BroadcastInfo.m_Args.InsertAfter( 1, CopyString( dependencyInfo.m_Files[i]->m_Name ) );
}
// Add -mpi_filebase so it can use absolute paths with the filesystem so we get the exact right set of files.
m_BroadcastInfo.m_Args.InsertAfter( 0, "-mpi_filebase" );
m_BroadcastInfo.m_Args.InsertAfter( 1, CopyString( dependencyInfo.m_DependencyFilesDir ) );
if ( bPatchMode )
{
GetPatchWorkerList( argc, argv );
}
}
// Add ourselves as the first process (rank 0).
m_ConnectionCreator.CreateNewHandler();
// Initiate as many connections as we can for a few seconds.
m_LastSendTime = Plat_MSTime() - MASTER_BROADCAST_INTERVAL*2;
m_hShutdownEvent.Init( false, false );
m_hShutdownReply.Init( false, false );
DWORD dwThreadID = 0;
m_hThread = CreateThread(
NULL,
0,
&CMasterBroadcaster::StaticThreadFn,
this,
0,
&dwThreadID );
if ( m_hThread )
{
SetThreadPriority( m_hThread, THREAD_PRIORITY_HIGHEST );
return true;
}
else
{
return false;
}
}
void CMasterBroadcaster::BuildBroadcastPacket( bf_write &buf )
{
// Broadcast out to tell all the machines we want workers.
buf.WriteByte( VMPI_PROTOCOL_VERSION );
buf.WriteString( m_BroadcastInfo.m_Password );
if ( m_BroadcastInfo.m_PatchVersion[0] == 0 )
buf.WriteByte( VMPI_LOOKING_FOR_WORKERS );
else
buf.WriteByte( VMPI_SERVICE_PATCH );
buf.WriteString( m_BroadcastInfo.m_PatchVersion );
buf.WriteLong( m_iListenPort ); // Tell the port that we're listening on.
buf.WriteLong( m_BroadcastInfo.m_JobID[0] );
buf.WriteLong( m_BroadcastInfo.m_JobID[1] );
buf.WriteLong( m_BroadcastInfo.m_JobID[2] );
buf.WriteLong( m_BroadcastInfo.m_JobID[3] );
buf.WriteWord( m_BroadcastInfo.m_Args.Count() + 2 );
// Write the alternate exe name.
buf.WriteString( m_BroadcastInfo.m_WorkerExeFilename );
// Write the machine name of the master into the command line. It's ignored by the code, but it's useful
// if a job crashes the workers - by looking at the command line in vmpi_service, you can see who ran the job.
buf.WriteString( "-mpi_MasterName" );
buf.WriteString( VMPI_GetLocalMachineName() );
for ( int i=1; i < m_BroadcastInfo.m_Args.Count(); i++ )
buf.WriteString( m_BroadcastInfo.m_Args[i] );
buf.WriteByte( (unsigned char)m_BroadcastInfo.m_bForcePatch );
buf.WriteShort( m_iDownloaderListenPort ); // Tell the port that we're listening for downloaders on.
}
bool CMasterBroadcaster::Update()
{
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
connectionsLock.Lock();
// Don't accept any more connections when we've hit the limit.
int nActiveConnections, nServiceDownloaders;
CountActiveConnections( &nActiveConnections, &nServiceDownloaders );
if ( nActiveConnections >= m_nMaxWorkers )
return false;
// Only broadcast our presence so often.
if ( m_pSocket )
{
DWORD curTime = Plat_MSTime();
if ( curTime - m_LastSendTime >= MASTER_BROADCAST_INTERVAL )
{
char packetData[512];
bf_write packetBuf( "packetBuf", packetData, sizeof( packetData ) );
BuildBroadcastPacket( packetBuf );
for ( int iBroadcastPort=VMPI_SERVICE_PORT; iBroadcastPort <= VMPI_LAST_SERVICE_PORT; iBroadcastPort++ )
{
if ( m_bPatching )
{
// Only send to this specific list of workers if necessary.
for ( int i=0; i < m_PatchWorkerIPs.Count(); i++ )
{
CIPAddr addr = m_PatchWorkerIPs[i];
addr.port = iBroadcastPort;
m_pSocket->SendTo( &addr, packetBuf.GetBasePointer(), packetBuf.GetNumBytesWritten() );
}
}
else
{
m_pSocket->Broadcast( packetBuf.GetBasePointer(), packetBuf.GetNumBytesWritten(), iBroadcastPort );
}
}
// We don't want them to keep patching over and over.
if ( m_PatchWorkerIPs.Count() > 0 && m_BroadcastInfo.m_bForcePatch )
m_PatchWorkerIPs.Purge();
m_LastSendTime = curTime;
}
}
// First look for normal workers.
IThreadedTCPSocket *pNewConn = NULL;
bool bRet = m_pListenSocket->Update( &pNewConn, 0 );
// Now look for downloaders.
if ( !bRet || !pNewConn )
{
if ( m_pDownloaderListenSocket )
{
int nDownloadersAllowed = (m_nMaxWorkers - nActiveConnections) + 8; // Don't allow too many downloaders.
if ( nServiceDownloaders < nDownloadersAllowed )
bRet = m_pDownloaderListenSocket->Update( &pNewConn, 0 );
}
}
if ( bRet && pNewConn )
{
// Mark this guy as a downloader if necessary.
CIPAddr remoteAddr = pNewConn->GetRemoteAddr();
if ( remoteAddr.port >= VMPI_SERVICE_DOWNLOADER_PORT_FIRST && remoteAddr.port <= VMPI_SERVICE_DOWNLOADER_PORT_LAST )
{
CVMPIConnection *pVMPIConnection = FindConnectionBySocket( pNewConn, false );
if ( pVMPIConnection )
pVMPIConnection->m_bIsAService = true;
}
// Send this guy all the persistent packets.
CCriticalSectionLock csLock( &g_PersistentPacketsCS );
csLock.Lock();
FOR_EACH_LL( g_PersistentPackets, iPacket )
{
PersistentPacket *pPacket = g_PersistentPackets[iPacket];
VMPI_SendData( pPacket->Base(), pPacket->Count(), g_nConnections-1 );
}
UpdateActiveConnectionsText();
return true;
}
else
{
return false;
}
}
void CMasterBroadcaster::ThreadFn()
{
// Update every 100ms or until the main thread tells us to go away.
while ( WaitForSingleObject( m_hShutdownEvent.GetEventHandle(), 20 ) == WAIT_TIMEOUT )
{
DWORD startTime = GetTickCount();
while ( Update() && (GetTickCount() - startTime) < 500 )
{
}
}
m_hShutdownReply.SetEvent();
}
DWORD CMasterBroadcaster::StaticThreadFn( LPVOID lpParameter )
{
((CMasterBroadcaster*)lpParameter)->ThreadFn();
return 0;
}
void CMasterBroadcaster::Term()
{
// Shutdown the update thread.
if ( m_hThread )
{
m_hShutdownEvent.SetEvent();
WaitForSingleObject( m_hThread, INFINITE );
CloseHandle( m_hThread );
m_hThread = 0;
}
if ( m_pSocket )
{
m_pSocket->Release();
m_pSocket = NULL;
}
if ( m_pListenSocket )
{
m_pListenSocket->Release();
m_pListenSocket = NULL;
}
if ( m_pDownloaderListenSocket )
{
m_pDownloaderListenSocket->Release();
m_pDownloaderListenSocket = NULL;
}
m_iListenPort = -1;
m_iDownloaderListenPort = -1;
}
int CMasterBroadcaster::GetListenPort() const
{
return m_iListenPort;
}
int CMasterBroadcaster::GetMaxWorkers() const
{
return m_nMaxWorkers;
}
void CMasterBroadcaster::IncreaseMaxWorkers( int count )
{
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
connectionsLock.Lock();
m_nMaxWorkers = min( MAX_VMPI_CONNECTIONS, m_nMaxWorkers + count );
}
void CMasterBroadcaster::SetPassword( const char *pPassword )
{
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
connectionsLock.Lock();
Q_strncpy( m_BroadcastInfo.m_Password, pPassword, sizeof( m_BroadcastInfo.m_Password ) );
}
void CMasterBroadcaster::SetNoTimeoutOption()
{
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
connectionsLock.Lock();
// Don't re-add the option if it's already there.
for ( int i=1; i < m_BroadcastInfo.m_Args.Count(); i++ )
{
if ( Q_stricmp( m_BroadcastInfo.m_Args[i], VMPI_GetParamString( mpi_NoTimeout ) ) == 0 )
return;
}
m_BroadcastInfo.m_Args.InsertAfter( 0, (char*)VMPI_GetParamString( mpi_NoTimeout ) );
}
CMasterBroadcaster g_MasterBroadcaster;
// ---------------------------------------------------------------------------------------- //
// CDispatchReg.
// ---------------------------------------------------------------------------------------- //
CDispatchReg::CDispatchReg( int iPacketID, VMPIDispatchFn fn )
{
Assert( iPacketID >= 0 && iPacketID < MAX_VMPI_PACKET_IDS );
Assert( !g_VMPIDispatch[iPacketID] );
g_VMPIDispatch[iPacketID] = fn;
}
void VMPI_HandleTimingWait_Worker()
{
if ( VMPI_IsParamUsed( mpi_TimingWait ) )
{
Msg( "-mpi_TimingWait specified. Waiting for master to start..." );
// Wait for the signal to go.
while ( !g_bTimingWaitDone )
{
VMPI_DispatchNextMessage( 50 );
}
Msg( "\n ");
}
}
void VMPI_HandleTimingWait_Master()
{
if ( VMPI_IsParamUsed( mpi_TimingWait ) )
{
Msg( "-mpi_TimingWait specified. Waiting for a keypress to continue... " );
getch();
Msg( "\n" );
unsigned char cPacket[2] = { VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE };
VMPI_SendData( cPacket, sizeof( cPacket ), VMPI_PERSISTENT );
}
}
// ---------------------------------------------------------------------------------------- //
// Helpers.
// ---------------------------------------------------------------------------------------- //
bool MPI_Init_Worker( int &argc, char **&argv, const CIPAddr &masterAddr, bool bConnectingAsService )
{
g_bMPIMaster = false;
// Make a connector to try connect to the master.
CVMPIConnectionCreator connectionCreator;
int iFirstPort = VMPI_WORKER_PORT_FIRST;
int iLastPort = VMPI_WORKER_PORT_LAST;
if ( bConnectingAsService )
{
iFirstPort = VMPI_SERVICE_DOWNLOADER_PORT_FIRST;
iLastPort = VMPI_SERVICE_DOWNLOADER_PORT_LAST;
}
// Now wait for a connection.
int nAttempts = 1;
Retry:;
ITCPConnectSocket *pConnectSocket = NULL;
int iPort;
for ( iPort=iFirstPort; iPort <= iLastPort; iPort++ )
{
pConnectSocket = ThreadedTCP_CreateConnector(
masterAddr,
CIPAddr( 0, 0, 0, 0, iPort ),
&connectionCreator );
if ( pConnectSocket )
break;
}
if ( !pConnectSocket )
{
Error( "Can't bind a port in range [%d, %d].", iFirstPort, iLastPort );
}
CWaitTimer wait( 3 );
while ( 1 )
{
IThreadedTCPSocket *pSocket = NULL;
if ( pConnectSocket->Update( &pSocket, 100 ) )
{
if ( pSocket )
{
// Send the master our machine name.
VMPI_SendMachineNameTo( VMPI_MASTER_ID );
// Verify that the exe is correct.
VMPI_ReceiveExeName();
if ( g_bVMPISDKMode )
{
VMPI_ReceiveCommandLine();
CommandLine()->CreateCmdLine( g_WorkerCommandLine.Count(), g_WorkerCommandLine.Base() );
argc = g_WorkerCommandLine.Count();
argv = g_WorkerCommandLine.Base();
}
ParseOptions( g_WorkerCommandLine.Count(), g_WorkerCommandLine.Base() );
for ( int i=0; i < g_WorkerCommandLine.Count(); i++ )
{
Msg( "arg %d: %s\n", i, g_WorkerCommandLine[i] );
}
VMPI_HandleTimingWait_Worker();
return true;
}
}
else
{
pConnectSocket->Release();
Error( "ITCPConnectSocket::Update() errored out" );
}
if( wait.ShouldKeepWaiting() )
Sleep( 100 );
else
break;
};
// Never made a connection, shucks.
pConnectSocket->Release();
if ( VMPI_IsParamUsed( mpi_Retry ) )
{
Msg( "%s found. Retrying connection to %d.%d.%d.%d:%d (attempt %d).\n", VMPI_GetParamString( mpi_Retry ), masterAddr.ip[0], masterAddr.ip[1], masterAddr.ip[2], masterAddr.ip[3], masterAddr.port, nAttempts++ );
goto Retry;
}
return false;
}
bool SpawnLocalWorker( int argc, char **argv, int iListenPort, bool bShowConsoleWindow )
{
char commandLine[4096];
commandLine[0] = 0;
// Add the -mpi_worker argument in, then launch the process.
for ( int i=0; i < 9999999; i++ )
{
char argStr[512];
if ( i == 1 )
{
Q_snprintf( argStr, sizeof( argStr ), "-mpi_worker 127.0.0.1:%d ", iListenPort );
Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS );
Q_strncat( commandLine, "-allowdebug ", sizeof( commandLine ), COPY_ALL_CHARACTERS );
// Add -mpi_SDKMode if it's needed. This would mostly only occur in a debugging situation
// (someone running out of rel using -mpi_AutoLocalWorker).
if ( VMPI_IsSDKMode() && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_SDKMode ), "" ) )
{
Q_strncat( commandLine, VMPI_GetParamString( mpi_SDKMode ), sizeof( commandLine ), COPY_ALL_CHARACTERS );
}
}
if ( i >= argc )
break;
Q_snprintf( argStr, sizeof( argStr ), "\"%s\" ", argv[i] );
Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS );
}
char workingDir[1024];
if ( !_getcwd( workingDir, sizeof( workingDir ) ) )
{
Warning( "_getcwd() failed.\n" );
return false;
}
STARTUPINFO si;
memset( &si, 0, sizeof( si ) );
si.cb = sizeof( si );
PROCESS_INFORMATION pi;
memset( &pi, 0, sizeof( pi ) );
if ( CreateProcess(
NULL,
commandLine,
NULL, // security
NULL,
TRUE,
(bShowConsoleWindow ? CREATE_NEW_CONSOLE : CREATE_NO_WINDOW) | IDLE_PRIORITY_CLASS, // flags
NULL, // environment
workingDir, // current directory (use c:\\ because we don't want it to accidentally share
// DLLs like vstdlib with us).
&si,
&pi ) )
{
return true;
}
else
{
char errStr[1024];
IP_GetLastErrorString( errStr, sizeof( errStr ) );
Warning( " - ERROR in CreateProcess (%s)!\n", errStr );
return false;
}
}
bool InitMaster( int argc, char **argv, const char *pDependencyFilename, VMPIRunMode runMode, bool bPatchMode )
{
int nMaxWorkers = -1;
const char *pProcCount = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_WorkerCount ) );
if ( pProcCount )
{
nMaxWorkers = atoi( pProcCount );
Warning( "%s: waiting for %d processes to join.\n", VMPI_GetParamString( mpi_WorkerCount ), nMaxWorkers );
}
else
{
nMaxWorkers = DEFAULT_MAX_WORKERS;
}
nMaxWorkers = clamp( nMaxWorkers, 2, MAX_VMPI_CONNECTIONS );
g_bMPIMaster = true;
g_nMaxWorkerCount = nMaxWorkers;
if ( argc <= 0 )
Error( "MPI_Init_Master: argc <= 0!" );
ParseOptions( argc, argv );
// Send the base filename of the exe we're running. Sometimes if we run vvis followed by vrad
// really quickly, the old vvis workers can connect to the vrad process and mess with it.
VMPI_SendExeName();
// In SDK mode, the master sends the command line to the workers since
// the workers weren't given a full command line by vmpi_service.
if ( VMPI_IsSDKMode() )
{
VMPI_SendCommandLine( argc, argv );
}
if ( !g_MasterBroadcaster.Init( argc, argv, pDependencyFilename, nMaxWorkers, runMode, bPatchMode ) )
return false;
bool bRet;
if ( runMode == VMPI_RUN_LOCAL )
{
bRet = SpawnLocalWorker( argc, argv, g_MasterBroadcaster.GetListenPort(), false );
}
else
{
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_AutoLocalWorker ), "" ) )
{
Msg( "%s found. Spawning a local worker automatically.\n", VMPI_GetParamString( mpi_AutoLocalWorker ) );
SpawnLocalWorker( 1, argv, g_MasterBroadcaster.GetListenPort(), true );
}
bRet = true;
}
VMPI_HandleTimingWait_Master();
return bRet;
}
void VMPI_InitGlobals( int argc, char **argv, VMPIRunMode runMode )
{
g_bUseMPI = true;
g_VMPIRunMode = runMode;
// Init event objects.
g_VMPIMessagesEvent.Init( false, false );
g_ErrorSocketsEvent.Init( false, false );
// Load this for GetConsoleWindow().
g_hKernel32DLL = LoadLibrary( "kernel32.dll" );
if ( g_hKernel32DLL )
{
g_pConsoleWndFn = (GetConsoleWndFn)GetProcAddress( g_hKernel32DLL, "GetConsoleWindow" );
}
#if defined( _DEBUG )
for ( int iArg=0; iArg < argc; iArg++ )
{
Warning( "%s\n", argv[iArg] );
}
Warning( "\n" );
#endif
}
bool VMPI_CheckForNonSDKExecutables()
{
char baseExeFilename[512];
if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) )
Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." );
V_StripLastDir( baseExeFilename, sizeof( baseExeFilename ) );
V_AppendSlash( baseExeFilename, sizeof( baseExeFilename ) );
V_strncat( baseExeFilename, "mysql_wrapper.dll", sizeof( baseExeFilename ) );
// If vmpi_transfer.exe doesn't exist, then we assume we're in SDK mode.
return ( _access( baseExeFilename, 0 ) == 0 );
}
bool IsValidSDKBinPath( CUtlVector< char* > &outStrings, int *pError )
{
*pError = 0;
// Minimum must have drive:/basedir/steamapps/name/sourcesdk/bin/[ep1|orangebox]/bin/exename
if ( outStrings.Count() < 9 )
{
*pError = 0;
return false;
}
if ( V_stricmp( outStrings[outStrings.Count()-2], "bin" ) != 0 )
{
*pError = 1;
return false;
}
if ( V_stricmp( outStrings[outStrings.Count()-5], "sourcesdk" ) != 0 )
{
*pError = 2;
return false;
}
if ( V_stricmp( outStrings[outStrings.Count()-7], "steamapps" ) != 0 )
{
*pError = 3;
return false;
}
// Check the last-access date on clientregistry.blob
char baseSteamPath[MAX_PATH];
V_strncpy( baseSteamPath, outStrings[0], sizeof( baseSteamPath) );
for ( int i=1; i < outStrings.Count() - 7; i++ )
{
V_AppendSlash( baseSteamPath, sizeof( baseSteamPath ) );
V_strncat( baseSteamPath, outStrings[i], sizeof( baseSteamPath ) );
}
char blobPath[MAX_PATH];
V_ComposeFileName( baseSteamPath, "ClientRegistry.blob", blobPath, sizeof( blobPath ) );
struct _stat results;
if ( _stat( blobPath, &results ) != 0 )
{
*pError = 4;
return false;
}
long curTime;
VCRHook_Time( &curTime );
int nSecondsSinceLastSteamAccess = curTime - results.st_mtime;
int nSecondsPerDay = 60 * 60 * 24;
int nMaxDaysUnaccessed = 10;
if ( nSecondsSinceLastSteamAccess > nSecondsPerDay*nMaxDaysUnaccessed )
{
*pError = 5; // NOTE: don't change this error code because the outer function checks for it.
return false;
}
// Check for some of the files under sourcesdk_content.
char sourcesdkContentPath[MAX_PATH];
V_strncpy( sourcesdkContentPath, outStrings[0], sizeof( sourcesdkContentPath ) );
for ( int i=1; i < outStrings.Count() - 5; i++ )
{
V_AppendSlash( sourcesdkContentPath, sizeof( sourcesdkContentPath ) );
V_strncat( sourcesdkContentPath, outStrings[i], sizeof( sourcesdkContentPath ) );
}
V_AppendSlash( sourcesdkContentPath, sizeof( sourcesdkContentPath ) );
V_strncat( sourcesdkContentPath, "sourcesdk_content", sizeof( sourcesdkContentPath ) );
char tempFilename[MAX_PATH], mapsrcFilename[MAX_PATH];
V_snprintf( tempFilename, sizeof( tempFilename ), "cstrike%cmapsrc", CORRECT_PATH_SEPARATOR );
V_ComposeFileName( sourcesdkContentPath, tempFilename, mapsrcFilename, sizeof( mapsrcFilename ) );
if ( _access( mapsrcFilename, 0 ) != 0 )
{
*pError = 6;
return false;
}
return true;
}
void VerifyValidSDKMode()
{
// Make sure we're running out of the SourceSDK directory and that our SDK directories are filled out.
char baseExeFilename[MAX_PATH];
if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) )
Error( "VerifyValidSDKMode: GetModuleFileName failed." );
V_FixSlashes( baseExeFilename );
CUtlVector< char* > outStrings;
char strSlash[2] = {CORRECT_PATH_SEPARATOR, 0};
V_SplitString( baseExeFilename, strSlash, outStrings );
int err;
if ( !IsValidSDKBinPath( outStrings, &err ) )
{
outStrings.PurgeAndDeleteElements();
if ( err == 5 )
Error( "VMPI running in SDK mode but Steam hasn't been run recently. Please run Steam and retry." );
else
Error( "VMPI running in SDK mode but incorrect SDK install detected (error %d).", err );
}
}
void VMPI_CheckSDKMode( int argc, char **argv )
{
g_bVMPISDKMode = !VMPI_CheckForNonSDKExecutables();
g_bVMPISDKModeSet = true;
// Also check for -mpi_sdkmode (only used in testing).
if ( !g_bVMPISDKMode )
{
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_SDKMode ), "" ) )
g_bVMPISDKMode = true;
}
if ( g_bVMPISDKMode )
{
VerifyValidSDKMode();
}
if ( g_bVMPISDKMode )
{
Msg( "VMPI running in SDK mode.\n" );
}
}
void VMPI_SetupAutoRestartParameters( int argc, char **argv )
{
if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_AutoRestart ) ) )
{
g_OriginalCommandLineParameters.SetSize( argc );
for ( int i=0; i < argc; i++ )
{
g_OriginalCommandLineParameters[i] = CopyString( argv[i] );
}
}
}
bool VMPI_HandleAutoRestart()
{
if ( g_OriginalCommandLineParameters.Count() == 0 )
return true;
Msg( "%s found. Auto-restarting.\n", VMPI_GetParamString( mpi_AutoRestart ) );
DWORD curPriority = GetPriorityClass( GetCurrentProcess() );
char commandLine[1024*8];
commandLine[0] = 0;
// Add the -mpi_worker argument in, then launch the process.
for ( int i=0; i < g_OriginalCommandLineParameters.Count(); i++ )
{
char argStr[512];
Q_snprintf( argStr, sizeof( argStr ), "\"%s\" ", g_OriginalCommandLineParameters[i] );
Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS );
}
STARTUPINFO si;
memset( &si, 0, sizeof( si ) );
si.cb = sizeof( si );
PROCESS_INFORMATION pi;
memset( &pi, 0, sizeof( pi ) );
if ( CreateProcess(
NULL,
commandLine,
NULL, // security
NULL,
TRUE,
CREATE_NEW_CONSOLE | curPriority, // flags
NULL, // environment
NULL,
&si,
&pi ) )
{
g_OriginalCommandLineParameters.Purge();
return true;
}
else
{
char errStr[1024];
IP_GetLastErrorString( errStr, sizeof( errStr ) );
Warning( " - ERROR in CreateProcess (%s)!\n", errStr );
return false;
}
}
bool VMPI_Init(
int &argc,
char **&argv,
const char *pDependencyFilename,
VMPI_Disconnect_Handler handler,
VMPIRunMode runMode,
bool bConnectingAsService
)
{
if ( handler )
VMPI_AddDisconnectHandler( handler );
VMPI_SetupAutoRestartParameters( argc, argv );
VMPI_CheckSDKMode( argc, argv );
VMPI_InitGlobals( argc, argv, runMode );
// Were we launched by the vmpi service as a worker?
const char *pMasterIP = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), NULL );
if ( pMasterIP )
{
CIPAddr addr;
addr.port = VMPI_MASTER_FIRST_PORT;
if ( !ConvertStringToIPAddr( pMasterIP, &addr ) )
Error( "Unable to parse or resolve master IP (%s).\n", pMasterIP );
return MPI_Init_Worker( argc, argv, addr, bConnectingAsService );
}
else
{
if ( !pDependencyFilename )
{
Error( "VMPI started as master, but no dependency filename specified.\n" );
return false;
}
return InitMaster( argc, argv, pDependencyFilename, runMode, false );
}
}
void VMPI_Init_PatchMaster( int argc, char **argv )
{
const char *pPatchDirectory = VMPI_FindArg( argc, argv, "-mpi_PatchDirectory", NULL );
if ( !pPatchDirectory )
Error( "-mpi_PatchDirectory <dir> must be specified if using -PatchHost mode." );
VMPI_InitGlobals( argc, argv, VMPI_RUN_NETWORKED );
InitMaster( argc, argv, pPatchDirectory, VMPI_RUN_NETWORKED, true );
}
void VMPI_Finalize()
{
g_MasterBroadcaster.Term();
DistributeWork_Cancel();
// Get rid of all the sockets.
for ( int iConn=0; iConn < g_nConnections; iConn++ )
delete g_Connections[iConn];
g_nConnections = 0;
// Get rid of all the packets.
FOR_EACH_LL( g_VMPIMessages, i )
{
g_VMPIMessages[i]->Release();
}
g_VMPIMessages.Purge();
g_PersistentPackets.PurgeAndDeleteElements();
// Get rid of the message buffers
g_DispatchBuffers.Purge();
if ( g_hKernel32DLL )
{
FreeLibrary( g_hKernel32DLL );
g_hKernel32DLL = NULL;
}
g_WorkerCommandLine.PurgeAndDeleteElements();
VMPI_HandleAutoRestart();
}
VMPIRunMode VMPI_GetRunMode()
{
return g_VMPIRunMode;
}
VMPIFileSystemMode VMPI_GetFileSystemMode()
{
return g_VMPIFileSystemMode;
}
int VMPI_GetCurrentNumberOfConnections()
{
return g_nConnections;
}
void InternalHandleSocketErrors()
{
// Copy the list of sockets with errors into a local array so we can handle all the errors outside
// the mutex, thus avoiding potential deadlock if any error handlers call Error().
CUtlVector<CVMPIConnection*> errorSockets;
CCriticalSectionLock csLock( &g_ErrorSocketsCS );
csLock.Lock();
errorSockets.SetSize( g_ErrorSockets.Count() );
int iCur = 0;
FOR_EACH_LL( g_ErrorSockets, i )
{
errorSockets[iCur++] = g_ErrorSockets[i];
}
g_ErrorSockets.Purge();
csLock.Unlock();
// Handle the errors.
for ( int i=0; i < errorSockets.Count(); i++ )
{
errorSockets[i]->HandleDisconnect();
}
UpdateActiveConnectionsText();
}
void VMPI_HandleSocketErrors( unsigned long timeout )
{
DWORD ret = WaitForSingleObject( g_ErrorSocketsEvent.GetEventHandle(), timeout );
if ( ret == WAIT_OBJECT_0 )
{
InternalHandleSocketErrors();
}
}
// If bWait is false, then this function returns false immediately if there are no messages waiting.
bool VMPI_GetNextMessage( MessageBuffer *pBuf, int *pSource, unsigned long startTimeout )
{
HANDLE handles[2] = { g_ErrorSocketsEvent.GetEventHandle(), g_VMPIMessagesEvent.GetEventHandle() };
DWORD startTime = Plat_MSTime();
DWORD timeout = startTimeout;
while ( 1 )
{
DWORD ret = WaitForMultipleObjects( ARRAYSIZE( handles ), handles, FALSE, timeout );
if ( ret == WAIT_TIMEOUT )
{
return false;
}
else if ( ret == WAIT_OBJECT_0 )
{
// A socket had an error. Handle all socket errors.
InternalHandleSocketErrors();
// Update the timeout.
DWORD delta = Plat_MSTime() - startTime;
if ( delta >= startTimeout )
return false;
timeout = startTimeout - delta;
continue;
}
else if ( ret == (WAIT_OBJECT_0 + 1) )
{
// Read out the next message.
CCriticalSectionLock csLock( &g_VMPIMessagesCS );
csLock.Lock();
GrabNextMessage:;
int iHead = g_VMPIMessages.Head();
CTCPPacket *pPacket = g_VMPIMessages[iHead];
g_VMPIMessages.Remove( iHead );
// Set the event again if there are more messages waiting.
const char *pBase = pPacket->GetData();
if ( pPacket->GetLen() >= 6 && (unsigned char)pBase[0] == VMPI_INTERNAL_PACKET_ID && (unsigned char)pBase[1] == VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET )
{
// Ok, this is a grouped packet. Split it out into a bunch of separate packets.
CUtlVector<CTCPPacket*> groupedPackets;
int iCurOffset = 2;
while ( (iCurOffset+4) <= pPacket->GetLen() )
{
int curPacketLen = *((int*)&pBase[iCurOffset]);
if ( iCurOffset + curPacketLen > pPacket->GetLen() )
Error( "Invalid chunked packet\n" );
iCurOffset += 4;
CTCPPacket *pChunkPacket = (CTCPPacket*)malloc( sizeof( CTCPPacket ) + curPacketLen - 1 );
pChunkPacket->m_Len = curPacketLen;
pChunkPacket->m_UserData = pPacket->m_UserData;
memcpy( pChunkPacket->m_Data, &pBase[iCurOffset], curPacketLen );
groupedPackets.AddToTail( pChunkPacket );
iCurOffset += curPacketLen;
}
for ( int i=0; i < groupedPackets.Count(); i++ )
{
g_VMPIMessages.AddToHead( groupedPackets[groupedPackets.Count() - i - 1] );
}
pPacket->Release();
goto GrabNextMessage;
}
else
{
if ( g_VMPIMessages.Count() > 0 )
g_VMPIMessagesEvent.SetEvent();
}
csLock.Unlock();
// Copy it into their message buffer.
pBuf->setLen( pPacket->GetLen() );
memcpy( pBuf->data, pPacket->GetData(), pPacket->GetLen() );
*pSource = pPacket->GetUserData();
Assert( *pSource >= 0 && *pSource < g_nConnections );
// Update global stats about how much data we've received.
++g_nMessagesReceived;
g_nBytesReceived += pPacket->GetLen() + 4; // (4 bytes extra for the packet length)
// Free the memory associated with the packet.
pPacket->Release();
return true;
}
else
{
Error( "VMPI_GetNextMessage: WaitForSingleObject returned %lu", ret );
return false;
}
}
}
bool VMPI_InternalDispatch( MessageBuffer *pBuf, int iSource )
{
if ( pBuf->getLen() >= 1 &&
pBuf->data[0] >= 0 && pBuf->data[0] < MAX_VMPI_PACKET_IDS &&
g_VMPIDispatch[pBuf->data[0]] )
{
return g_VMPIDispatch[ pBuf->data[0] ]( pBuf, iSource, pBuf->data[0] );
}
else
{
return false;
}
}
bool VMPI_DispatchNextMessage( unsigned long timeout )
{
MessageBuffer *pBuf = NULL;
if ( !g_DispatchBuffers.PopItem( &pBuf ) )
{
pBuf = new MessageBuffer();
}
bool bRetval = true;
while ( 1 )
{
int iSource;
if ( VMPI_GetNextMessage( pBuf, &iSource, timeout ) )
{
if ( VMPI_InternalDispatch( pBuf, iSource ) )
{
break;
}
else
{
// Workers running in service mode don't hook anything except filesystem stuff, so if they happen to be sent something, no problem.
if ( !VMPI_IsProcAService( iSource ) )
{
// Oops! What is this packet?
Assert( false );
}
}
}
else
{
bRetval = false;
break;
}
}
g_DispatchBuffers.PushItem( pBuf );
return bRetval;
}
bool VMPI_DispatchUntil( MessageBuffer *pBuf, int *pSource, int packetID, int subPacketID, bool bWait )
{
while ( 1 )
{
if ( !VMPI_GetNextMessage( pBuf, pSource, bWait ? VMPI_TIMEOUT_INFINITE : 0 ) )
return false;
if ( !VMPI_InternalDispatch( pBuf, *pSource ) )
{
if ( pBuf->getLen() >= 1 && (unsigned char)pBuf->data[0] == packetID )
{
if ( subPacketID == -1 )
return true;
if ( pBuf->getLen() >= 2 && (unsigned char)pBuf->data[1] == subPacketID )
return true;
}
// Oops! What is this packet?
// Note: the most common case where this happens is if it finishes a BuildFaceLights run
// and is in an AppBarrier and one of the workers is still finishing up some work given to it.
// It'll be waiting for a barrier packet, and it'll get results. In that case, the packet should
// be discarded like we do here, so maybe this assert won't be necessary.
//Assert( false );
}
}
}
bool VMPI_SendData( void *pData, int nBytes, int iDest, int fVMPISendFlags )
{
return VMPI_SendChunks( &pData, &nBytes, 1, iDest, fVMPISendFlags );
}
inline bool VMPI_FilterPacketsForServiceDownloader( CVMPIConnection *pConnection, void const * const *pChunks, const int *pChunkLengths, int nChunks )
{
if ( pConnection->m_bIsAService )
{
// Find the first byte and treat that as the packet ID.
for ( int i=0; i < nChunks; i++ )
{
if ( pChunkLengths[i] > 0 )
{
unsigned char cPacketID = *((unsigned char*)pChunks[i]);
if ( cPacketID == VMPI_INTERNAL_PACKET_ID || cPacketID == VMPI_SHARED_PACKET_ID || cPacketID == VMPI_PACKETID_FILESYSTEM )
return false;
else
return true;
}
}
}
return false;
}
void VMPI_GroupPackets( CVMPIConnection *pConn, void const * const *pChunks, const int *pChunkLengths, int nChunks )
{
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
connectionsLock.Lock();
// First add the header.
if ( pConn->m_GroupedChunks.Count() == 0 )
{
pConn->m_GroupedChunks.AddToTail( g_GroupedPacketHeader );
pConn->m_GroupedChunkLengths.AddToTail( sizeof( g_GroupedPacketHeader ) );
}
// Collate the chunks.
int nTotalLength = 0;
for ( int i=0; i < nChunks; i++ )
nTotalLength += pChunkLengths[i];
char *pOut = new char[nTotalLength + 4];
*((int*)pOut) = nTotalLength;
int iOutByte = 4;
for ( int i=0; i < nChunks; i++ )
{
memcpy( &pOut[iOutByte], pChunks[i], pChunkLengths[i] );
iOutByte += pChunkLengths[i];
}
pConn->m_GroupedChunks.AddToTail( pOut );
pConn->m_GroupedChunkLengths.AddToTail( nTotalLength + 4 );
}
void VMPI_FlushGroupedPackets( unsigned long msInterval )
{
if ( msInterval != 0 )
{
unsigned long curTime = Plat_MSTime();
if ( curTime - g_LastFlushGroupedPacketsTime < msInterval )
return;
g_LastFlushGroupedPacketsTime = curTime;
}
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
connectionsLock.Lock();
for ( int i=0; i < g_nConnections; i++ )
{
CVMPIConnection *pConn = g_Connections[i];
if ( !pConn )
continue;
IThreadedTCPSocket *pSocket = pConn->GetSocket();
if ( !pSocket || pConn->m_GroupedChunks.Count() == 0 )
continue;
pSocket->SendChunks( pConn->m_GroupedChunks.Base(), pConn->m_GroupedChunkLengths.Base(), pConn->m_GroupedChunks.Count() );
// Free the chunks.
for ( int i=1; i < pConn->m_GroupedChunks.Count(); i++ )
{
free( pConn->m_GroupedChunks[i] );
}
pConn->m_GroupedChunks.RemoveAll();
pConn->m_GroupedChunkLengths.RemoveAll();
}
}
bool VMPI_SendChunks( void const * const *pChunks, const int *pChunkLengths, int nChunks, int iDest, int fVMPISendFlags )
{
if ( iDest == VMPI_SEND_TO_ALL )
{
// Don't want new connections while in here!
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
connectionsLock.Lock();
for ( int i=0; i < g_nConnections; i++ )
VMPI_SendChunks( pChunks, pChunkLengths, nChunks, i );
return true;
}
else if ( iDest == VMPI_PERSISTENT )
{
// Don't want new connections while in here!
CCriticalSectionLock connectionsLock( &g_ConnectionsCS );
connectionsLock.Lock();
CCriticalSectionLock csLock( &g_PersistentPacketsCS );
csLock.Lock();
// Send the packet to everyone.
for ( int i=0; i < g_nConnections; i++ )
VMPI_SendChunks( pChunks, pChunkLengths, nChunks, i );
// Remember to send it to the new workers.
if ( iDest == VMPI_PERSISTENT )
{
PersistentPacket *pNew = new PersistentPacket;
for ( int i=0; i < nChunks; i++ )
pNew->AddMultipleToTail( pChunkLengths[i], (const char*)pChunks[i] );
g_PersistentPackets.AddToTail( pNew );
}
return true;
}
else
{
g_nMessagesSent++;
g_nBytesSent += 4; // for message tag.
for ( int i=0; i < nChunks; i++ )
g_nBytesSent += pChunkLengths[i];
CVMPIConnection *pConnection = g_Connections[iDest];
if ( pConnection )
{
// If it's a service downloader, only send certain packet IDs.
if ( VMPI_FilterPacketsForServiceDownloader( pConnection, pChunks, pChunkLengths, nChunks ) )
return true;
IThreadedTCPSocket *pSocket = pConnection->GetSocket();
if ( !pSocket )
return false;
if ( g_bGroupPackets && (fVMPISendFlags & k_eVMPISendFlags_GroupPackets) )
{
VMPI_GroupPackets( pConnection, pChunks, pChunkLengths, nChunks );
return true;
}
else
{
return pSocket->SendChunks( pChunks, pChunkLengths, nChunks );
}
}
else
{
return false;
}
}
}
bool VMPI_Send2Chunks( const void *pChunk1, int chunk1Len, const void *pChunk2, int chunk2Len, int iDest, int fVMPISendFlags )
{
const void *pChunks[2] = { pChunk1, pChunk2 };
int len[2] = { chunk1Len, chunk2Len };
return VMPI_SendChunks( pChunks, len, ARRAYSIZE( pChunks ), iDest, fVMPISendFlags );
}
bool VMPI_Send3Chunks( const void *pChunk1, int chunk1Len, const void *pChunk2, int chunk2Len, const void *pChunk3, int chunk3Len, int iDest, int fVMPISendFlags )
{
const void *pChunks[3] = { pChunk1, pChunk2, pChunk3 };
int len[3] = { chunk1Len, chunk2Len, chunk3Len };
return VMPI_SendChunks( pChunks, len, ARRAYSIZE( pChunks ), iDest, fVMPISendFlags );
}
void VMPI_AddDisconnectHandler( VMPI_Disconnect_Handler handler )
{
g_DisconnectHandlers.AddToTail( handler );
}
CVMPIConnection* GetConnection( int procID )
{
Assert( procID >= 0 && procID < g_nConnections );
return g_Connections[procID];
}
bool VMPI_IsProcConnected( int procID )
{
if ( procID < 0 || procID >= g_nConnections )
{
Assert( false );
return false;
}
return g_Connections[procID]->GetSocket() != NULL;
}
bool VMPI_IsProcAService( int procID )
{
if ( procID < 0 || procID >= g_nConnections )
{
Assert( false );
return false;
}
return g_Connections[procID]->m_bIsAService;
}
void VMPI_Sleep( unsigned long ms )
{
Sleep( ms );
}
const char* VMPI_GetMachineName( int iProc )
{
if ( g_bMPIMaster && iProc == VMPI_MASTER_ID )
return VMPI_GetLocalMachineName();
if ( iProc < 0 || iProc >= g_nConnections )
{
Assert( false );
return "invalid index";
}
return g_Connections[iProc]->GetMachineName();
}
void VMPI_SetMachineName( int iProc, const char *pName )
{
if ( iProc < 0 || iProc >= g_nConnections )
{
Assert( false );
return;
}
g_Connections[iProc]->SetMachineName( pName );
}
bool VMPI_HasMachineNameBeenSet( int iProc )
{
if ( iProc < 0 || iProc >= g_nConnections )
{
Assert( false );
return false;
}
return g_Connections[iProc]->HasMachineNameBeenSet();
}
const char* VMPI_GetLocalMachineName()
{
static char cName[MAX_COMPUTERNAME_LENGTH+1];
DWORD len = sizeof( cName );
if ( GetComputerName( cName, &len ) )
return cName;
else
return "(error in GetComputerName)";
}
unsigned long VMPI_GetJobWorkerID( int iProc )
{
return GetConnection( iProc )->m_JobWorkerID;
}
void VMPI_SetJobWorkerID( int iProc, unsigned long jobWorkerID )
{
GetConnection( iProc )->m_JobWorkerID = jobWorkerID;
}
void VMPI_GetCurrentStage( char *pOut, int strLen )
{
CCriticalSectionLock csLock( &g_CurrentStageCS );
csLock.Lock();
Q_strncpy( pOut, g_CurrentStageString, strLen );
}
void VMPI_SetCurrentStage( const char *pCurStage )
{
CCriticalSectionLock csLock( &g_CurrentStageCS );
csLock.Lock();
Q_strncpy( g_CurrentStageString, pCurStage, sizeof( g_CurrentStageString ) );
}
void VMPI_InviteDebugWorkers()
{
// Only allow workers with password set to debugworker.
g_MasterBroadcaster.SetPassword( "debugworker" );
// Disable timeouts so they can sit in the debugger.
g_MasterBroadcaster.SetNoTimeoutOption();
ThreadedTCP_EnableTimeouts( false );
// Let in some more workers.
g_MasterBroadcaster.IncreaseMaxWorkers( 25 );
}
bool VMPI_IsSDKMode()
{
if ( g_bVMPISDKModeSet )
return g_bVMPISDKMode;
else
return !VMPI_CheckForNonSDKExecutables();
}
const char* VMPI_GetParamString( EVMPICmdLineParam eParam )
{
if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam )
{
Assert( false );
Warning( "Invalid call: VMPI_GetParamString( %d )\n", eParam );
return "unknown";
}
else
{
return g_VMPIParams[eParam].m_pName;
}
}
int VMPI_GetParamFlags( EVMPICmdLineParam eParam )
{
if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam )
{
Assert( false );
Warning( "Invalid call: VMPI_GetParamString( %d )\n", eParam );
return 0;
}
else
{
return g_VMPIParams[eParam].m_ParamFlags;
}
}
bool VMPI_IsParamUsed( EVMPICmdLineParam eParam )
{
int iParam = CommandLine()->FindParm( VMPI_GetParamString( eParam ) );
return iParam != 0;
}
const char* VMPI_GetParamHelpString( EVMPICmdLineParam eParam )
{
if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam )
{
Assert( false );
Warning( "Invalid call: VMPI_GetParamHelpString( %d )\n", eParam );
return "unknown vmpi param";
}
else
{
return g_VMPIParams[eParam].m_pHelpText;
}
}