source-engine/utils/vmpi/vmpi_distribute_work_default.cpp

603 lines
16 KiB
C++
Raw Permalink Normal View History

2020-04-22 16:56:21 +00:00
//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose:
//
//=============================================================================
#include "vmpi.h"
#include "vmpi_distribute_work.h"
#include "tier0/platform.h"
#include "tier0/dbg.h"
#include "utlvector.h"
#include "utllinkedlist.h"
#include "vmpi_dispatch.h"
#include "pacifier.h"
#include "vstdlib/random.h"
#include "mathlib/mathlib.h"
#include "threadhelpers.h"
#include "threads.h"
#include "tier1/strtools.h"
#include "tier1/utlmap.h"
#include "tier1/smartptr.h"
#include "tier0/icommandline.h"
#include "cmdlib.h"
#include "vmpi_distribute_tracker.h"
#include "vmpi_distribute_work_internal.h"
#define DW_SUBPACKETID_WU_ASSIGNMENT (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0)
static int s_numWusToDeal = -1;
void VMPI_SetWorkUnitsPartitionSize( int numWusToDeal )
{
s_numWusToDeal = numWusToDeal;
}
class CWorkUnitInfo
{
public:
WUIndexType m_iWorkUnit;
};
class CWULookupInfo
{
public:
CWULookupInfo() : m_iWUInfo( -1 ), m_iPartition( -222222 ), m_iPartitionListIndex( -1 ) {}
public:
int m_iWUInfo; // Index into m_WUInfo.
int m_iPartition; // Which partition it's in.
int m_iPartitionListIndex; // Index into its partition's m_WUs.
};
class CPartitionInfo
{
public:
typedef CUtlLinkedList< WUIndexType, int > PartitionWUs;
public:
int m_iPartition; // Index into m_Partitions.
int m_iWorker; // Who owns this partition?
PartitionWUs m_WUs; // Which WUs are in this partition?
};
// Work units tracker to track consecutive finished blocks
class CWorkUnitsTracker
{
public:
CWorkUnitsTracker() {}
public:
// Initializes the unit tracker to receive numUnits in future
void PrepareForWorkUnits( uint64 numUnits );
// Signals that a work unit has been finished
// returns a zero-based index of the next pending work unit
// up to which the task list has been processed fully now
// because the received work unit filled the gap or was the next pending work unit.
// returns 0 to indicate that this work unit is a "faster processed future work unit".
uint64 WorkUnitFinished( uint64 iWorkUnit );
public:
enum WUInfo { kNone, kTrigger, kDone };
CVisibleWindowVector< uint8 > m_arrInfo;
};
void CWorkUnitsTracker::PrepareForWorkUnits( uint64 numUnits )
{
m_arrInfo.Reset( numUnits + 1 );
if ( numUnits )
{
m_arrInfo.ExpandWindow( 2ull, kNone );
m_arrInfo.Get( 0ull ) = kTrigger;
}
}
uint64 CWorkUnitsTracker::WorkUnitFinished( uint64 iWorkUnit )
{
uint64 uiResult = uint64( 0 );
if ( iWorkUnit >= m_arrInfo.FirstPossibleIndex() && iWorkUnit < m_arrInfo.PastPossibleIndex() )
{
// Need to access the element
m_arrInfo.ExpandWindow( iWorkUnit + 1, kNone );
// Set it done
uint8 &rchThere = m_arrInfo.Get( iWorkUnit ), chThere = rchThere;
rchThere = kDone;
// Should we trigger?
if ( kTrigger == chThere )
{
// Go along all "done" work units and trigger the last found one
while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) &&
( kDone == m_arrInfo.Get( iWorkUnit ) ) )
continue;
m_arrInfo.Get( iWorkUnit ) = kTrigger;
m_arrInfo.ShrinkWindow( iWorkUnit - 1 );
uiResult = iWorkUnit;
}
else if( iWorkUnit == m_arrInfo.FirstPossibleIndex() )
{
// Go along all "done" work units and shrink including the last found one
while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) &&
( kDone == m_arrInfo.Get( iWorkUnit ) ) )
continue;
m_arrInfo.ShrinkWindow( iWorkUnit - 1 );
}
}
return uiResult;
}
CWorkUnitsTracker g_MasterWorkUnitsTracker;
static bool CompareSoonestWorkUnitSets( CPartitionInfo::PartitionWUs * const &x, CPartitionInfo::PartitionWUs * const &y )
{
// Compare by fourth/second/first job in the partitions
WUIndexType missing = ~WUIndexType(0);
WUIndexType jobsX[4] = { missing, missing, missing, missing };
WUIndexType jobsY[4] = { missing, missing, missing, missing };
int counter = 0;
counter = 0;
FOR_EACH_LL( (*x), i )
{
jobsX[ counter ++ ] = (*x)[i];
if ( counter >= 4 )
break;
}
counter = 0;
FOR_EACH_LL( (*y), i )
{
jobsY[ counter ++ ] = (*y)[i];
if ( counter >= 4 )
break;
}
// Compare
if ( jobsX[3] != jobsY[3] )
return ( jobsX[3] < jobsY[3] );
if ( jobsX[1] != jobsY[1] )
return ( jobsX[1] < jobsY[1] );
return jobsX[0] < jobsY[0];
}
class CDistributor_DefaultMaster : public IWorkUnitDistributorMaster
{
public:
virtual void Release()
{
delete this;
}
virtual void DistributeWork_Master( CDSInfo *pInfo )
{
m_pInfo = pInfo;
g_MasterWorkUnitsTracker.PrepareForWorkUnits( m_pInfo->m_nWorkUnits );
m_WULookup.Reset( pInfo->m_nWorkUnits );
while ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() )
{
VMPI_DispatchNextMessage( 200 );
VMPITracker_HandleDebugKeypresses();
if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() )
break;
}
}
virtual void OnWorkerReady( int iSource )
{
AssignWUsToWorker( iSource );
}
virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit )
{
CWULookupInfo *pLookup = NULL;
if ( iWorkUnit >= m_WULookup.FirstPossibleIndex() && iWorkUnit < m_WULookup.PastVisibleIndex() )
pLookup = &m_WULookup.Get( iWorkUnit );
if ( !pLookup || pLookup->m_iWUInfo == -1 )
return false;
// Mark this WU finished and remove it from the list of pending WUs.
m_WUInfo.Remove( pLookup->m_iWUInfo );
pLookup->m_iWUInfo = -1;
// Get rid of the WU from its partition.
int iPartition = pLookup->m_iPartition;
CPartitionInfo *pPartition = m_Partitions[iPartition];
pPartition->m_WUs.Remove( pLookup->m_iPartitionListIndex );
// Shrink the window of the lookup work units
if ( iWorkUnit == m_WULookup.FirstPossibleIndex() )
{
WUIndexType kwu = iWorkUnit;
for ( WUIndexType kwuEnd = m_WULookup.PastVisibleIndex(); kwu < kwuEnd; ++ kwu )
{
if ( -1 != m_WULookup.Get( kwu ).m_iWUInfo && kwu > iWorkUnit )
break;
}
m_WULookup.ShrinkWindow( kwu - 1 );
}
// Give the worker some new work if need be.
if ( pPartition->m_WUs.Count() == 0 )
{
int iPartitionWorker = pPartition->m_iWorker;
delete pPartition;
m_Partitions.Remove( iPartition );
// If there are any more WUs remaining, give the worker from this partition some more of them.
if ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() )
{
AssignWUsToWorker( iPartitionWorker );
}
}
uint64 iDoneWorkUnits = g_MasterWorkUnitsTracker.WorkUnitFinished( iWorkUnit );
if ( iDoneWorkUnits && g_pDistributeWorkCallbacks )
{
g_pDistributeWorkCallbacks->OnWorkUnitsCompleted( iDoneWorkUnits );
}
return true;
}
virtual void DisconnectHandler( int workerID )
{
int iPartitionLookup = FindPartitionByWorker( workerID );
if ( iPartitionLookup != -1 )
{
// Mark this guy's partition as unowned so another worker can get it.
CPartitionInfo *pPartition = m_Partitions[iPartitionLookup];
pPartition->m_iWorker = -1;
}
}
CPartitionInfo* AddPartition( int iWorker )
{
CPartitionInfo *pNew = new CPartitionInfo;
pNew->m_iPartition = m_Partitions.AddToTail( pNew );
pNew->m_iWorker = iWorker;
return pNew;
}
bool SplitWUsPartition( CPartitionInfo *pPartitionLarge,
CPartitionInfo **ppFirstHalf, CPartitionInfo **ppSecondHalf,
int iFirstHalfWorker, int iSecondHalfWorker )
{
int nCount = pPartitionLarge->m_WUs.Count();
if ( nCount > 1 ) // Allocate the partitions for the two workers
{
*ppFirstHalf = AddPartition( iFirstHalfWorker );
*ppSecondHalf = AddPartition( iSecondHalfWorker );
}
else // Specially transfer a partition with too few work units
{
*ppFirstHalf = NULL;
*ppSecondHalf = AddPartition( iSecondHalfWorker );
}
// Prepare for transfer
CPartitionInfo *arrNewParts[2] = { *ppFirstHalf ? *ppFirstHalf : *ppSecondHalf, *ppSecondHalf };
// Transfer the work units:
// alternate first/second halves
// don't put more than "half deal units" tasks into the second half
// e.g. { 1, 2, 3, 4 }
// becomes: 1st half { 1, 2 }, 2nd half { 3, 4 }
for ( int k = 0; k < nCount; ++ k )
{
int iHead = pPartitionLarge->m_WUs.Head();
WUIndexType iWU = pPartitionLarge->m_WUs[ iHead ];
pPartitionLarge->m_WUs.Remove( iHead );
/*
int nHalf = !!( ( k % 2 ) || ( k >= nCount - 1 ) );
if ( k == 5 ) // no more than 2 jobs to branch off
arrNewParts[ 1 ] = arrNewParts[ 0 ];
*/
int nHalf = !( k < nCount/2 );
CPartitionInfo *pTo = arrNewParts[ nHalf ];
CWULookupInfo &li = m_WULookup.Get( iWU );
li.m_iPartition = pTo->m_iPartition;
li.m_iPartitionListIndex = pTo->m_WUs.AddToTail( iWU );
}
// LogPartitionsWorkUnits( pInfo );
return true;
}
void AssignWUsToWorker( int iWorker )
{
// Get rid of this worker's old partition.
int iPrevious = FindPartitionByWorker( iWorker );
if ( iPrevious != -1 )
{
delete m_Partitions[iPrevious];
m_Partitions.Remove( iPrevious );
}
if ( g_iVMPIVerboseLevel >= 1 )
Msg( "A" );
CVisibleWindowVector< CWULookupInfo > &vlkup = m_WULookup;
if ( CommandLine()->FindParm( "-mpi_NoScheduler" ) )
{
Warning( "\n\n-mpi_NoScheduler found: Warning - this should only be used for testing and with 1 worker!\n\n" );
vlkup.ExpandWindow( m_pInfo->m_nWorkUnits );
CPartitionInfo *pPartition = AddPartition( iWorker );
for ( int i=0; i < m_pInfo->m_nWorkUnits; i++ )
{
CWorkUnitInfo info;
info.m_iWorkUnit = i;
CWULookupInfo &li = vlkup.Get( i );
li.m_iPartition = pPartition->m_iPartition;
li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i );
li.m_iWUInfo = m_WUInfo.AddToTail( info );
}
SendPartitionToWorker( pPartition, iWorker );
return;
}
// Any partitions abandoned by workers?
int iAbandonedPartition = FindPartitionByWorker( -1 );
if ( -1 != iAbandonedPartition )
{
CPartitionInfo *pPartition = m_Partitions[ iAbandonedPartition ];
pPartition->m_iWorker = iWorker;
SendPartitionToWorker( pPartition, iWorker );
}
// Any absolutely untouched partitions yet?
else if ( vlkup.PastVisibleIndex() < vlkup.PastPossibleIndex() )
{
// Figure out how many WUs to include in a batch
int numWusToDeal = s_numWusToDeal;
if ( numWusToDeal <= 0 )
{
uint64 uiFraction = vlkup.PastPossibleIndex() / g_nMaxWorkerCount;
Assert( uiFraction < INT_MAX/2 );
numWusToDeal = int( uiFraction );
if ( numWusToDeal <= 0 )
numWusToDeal = 8;
}
// Allocate room for upcoming work units lookup
WUIndexType iBegin = vlkup.PastVisibleIndex();
WUIndexType iEnd = min( iBegin + g_nMaxWorkerCount * numWusToDeal, vlkup.PastPossibleIndex() );
vlkup.ExpandWindow( iEnd - 1 );
// Allocate a partition
size_t numPartitions = min( ( size_t )(iEnd - iBegin), ( size_t )g_nMaxWorkerCount );
CArrayAutoPtr< CPartitionInfo * > spArrPartitions( new CPartitionInfo* [ numPartitions ] );
CPartitionInfo **arrPartitions = spArrPartitions.Get();
arrPartitions[0] = AddPartition( iWorker );
for ( size_t k = 1; k < numPartitions; ++ k )
arrPartitions[k] = AddPartition( -1 );
// Assign upcoming work units to the partitions.
for ( WUIndexType i = iBegin ; i < iEnd; ++ i )
{
CWorkUnitInfo info;
info.m_iWorkUnit = i;
CPartitionInfo *pPartition = arrPartitions[ size_t( (i - iBegin) % numPartitions ) ];
CWULookupInfo &li = vlkup.Get( i );
li.m_iPartition = pPartition->m_iPartition;
li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i );
li.m_iWUInfo = m_WUInfo.AddToTail( info );
}
// Now send this guy the WU list in his partition.
SendPartitionToWorker( arrPartitions[0], iWorker );
}
// Split one of the last partitions to finish sooner
else
{
// Find a partition to split.
int iPartToSplit = FindSoonestPartition();
if ( iPartToSplit >= 0 )
{
CPartitionInfo *pPartition = m_Partitions[ iPartToSplit ];
CPartitionInfo *pOldHalf = NULL, *pNewHalf = NULL;
int iOldWorker = pPartition->m_iWorker, iNewWorker = iWorker;
if ( SplitWUsPartition( pPartition, &pOldHalf, &pNewHalf, iOldWorker, iNewWorker ) )
{
if ( pOldHalf )
SendPartitionToWorker( pOldHalf, iOldWorker );
if ( pNewHalf )
SendPartitionToWorker( pNewHalf, iNewWorker );
// Delete the partition that got split
Assert( pPartition->m_WUs.Count() == 0 );
delete pPartition;
m_Partitions.Remove( iPartToSplit );
}
}
}
}
int FindSoonestPartition()
{
CUtlLinkedList < CPartitionInfo *, int > &lst = m_Partitions;
// Sorted partitions
CUtlMap< CPartitionInfo::PartitionWUs *, int > sortedPartitions ( CompareSoonestWorkUnitSets );
sortedPartitions.EnsureCapacity( lst.Count() );
FOR_EACH_LL( lst, i )
{
sortedPartitions.Insert( &lst[i]->m_WUs, i );
}
if ( sortedPartitions.Count() )
{
return sortedPartitions.Element( sortedPartitions.FirstInorder() );
}
return lst.Head();
}
int FindPartitionByWorker( int iWorker )
{
FOR_EACH_LL( m_Partitions, i )
{
if ( m_Partitions[i]->m_iWorker == iWorker )
return i;
}
return -1;
}
void SendPartitionToWorker( CPartitionInfo *pPartition, int iWorker )
{
// Stuff the next nWUs work units into the buffer.
MessageBuffer mb;
PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WU_ASSIGNMENT );
FOR_EACH_LL( pPartition->m_WUs, i )
{
WUIndexType iWU = pPartition->m_WUs[i];
mb.write( &iWU, sizeof( iWU ) );
VMPITracker_WorkUnitSentToWorker( ( int ) iWU, iWorker );
}
VMPI_SendData( mb.data, mb.getLen(), iWorker );
}
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents )
{
return false;
}
private:
CDSInfo *m_pInfo;
CUtlLinkedList<CPartitionInfo*,int> m_Partitions;
CVisibleWindowVector<CWULookupInfo> m_WULookup; // Map work unit index to CWorkUnitInfo.
CUtlLinkedList<CWorkUnitInfo,int> m_WUInfo; // Sorted with most elegible WU at the head.
};
class CDistributor_DefaultWorker : public IWorkUnitDistributorWorker
{
public:
virtual void Release()
{
delete this;
}
virtual void Init( CDSInfo *pInfo )
{
}
virtual bool GetNextWorkUnit( WUIndexType *pWUIndex )
{
CCriticalSectionLock csLock( &m_CS );
csLock.Lock();
// NOTE: this is called from INSIDE worker threads.
if ( m_WorkUnits.Count() == 0 )
{
return false;
}
else
{
*pWUIndex = m_WorkUnits[ m_WorkUnits.Head() ];
m_WorkUnits.Remove( m_WorkUnits.Head() );
return true;
}
}
virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU )
{
}
virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents )
{
if ( pBuf->data[1] == DW_SUBPACKETID_WU_ASSIGNMENT )
{
// If the message wasn't even related to the current DistributeWork() call we're on, ignore it.
if ( bIgnoreContents )
return true;
if ( ((pBuf->getLen() - pBuf->getOffset()) % sizeof( WUIndexType )) != 0 )
{
Error( "DistributeWork: invalid work units packet from master" );
}
// Parse out the work unit indices.
CCriticalSectionLock csLock( &m_CS );
csLock.Lock();
m_WorkUnits.Purge();
int nIndices = (pBuf->getLen() - pBuf->getOffset()) / sizeof( WUIndexType );
for ( int i=0; i < nIndices; i++ )
{
WUIndexType iWU;
pBuf->read( &iWU, sizeof( iWU ) );
// Add the index to the list.
m_WorkUnits.AddToTail( iWU );
}
csLock.Unlock();
return true;
}
else
{
return false;
}
}
// Threads eat up the list of WUs in here.
CCriticalSection m_CS;
CUtlLinkedList<WUIndexType, int> m_WorkUnits; // A list of work units assigned to this worker
};
IWorkUnitDistributorMaster* CreateWUDistributor_DefaultMaster()
{
return new CDistributor_DefaultMaster;
}
IWorkUnitDistributorWorker* CreateWUDistributor_DefaultWorker()
{
return new CDistributor_DefaultWorker;
}