source-engine/gcsdk/gcparalleljobfarm.cpp

120 lines
3.5 KiB
C++
Raw Normal View History

2020-04-22 16:56:21 +00:00
//====== Copyright (c), Valve Corporation, All rights reserved. =======
//
// Purpose: Implements parallel job farming process
//
//=============================================================================
#include "stdafx.h"
#include "rtime.h"
#include "gcparalleljobfarm.h"
namespace GCSDK
{
bool IYieldingParallelFarmJobHandler::BYieldingExecuteParallel( int numJobsParallel, char const *pchJobName, uint nTimeoutSec )
{
AssertRunningJob();
if ( !pchJobName )
pchJobName = GJobCur().GetName();
struct CParallelFarmHeapData_t
{
explicit CParallelFarmHeapData_t( IYieldingParallelFarmJobHandler *pHandler, int numJobsFarmLimit )
{
m_pHandler = pHandler;
m_jobIdParent = GJobCur().GetJobID();
m_numJobsFarmed = 0;
m_numJobsFarmLimit = MAX( 1, numJobsFarmLimit );
m_iJobSequenceCounter = 0;
m_bErrorEncountered = false;
m_bWorkloadCompleted = false;
}
IYieldingParallelFarmJobHandler *m_pHandler;
JobID_t m_jobIdParent;
int m_numJobsFarmLimit;
int m_numJobsFarmed;
int m_iJobSequenceCounter;
bool m_bErrorEncountered;
bool m_bWorkloadCompleted;
};
CParallelFarmHeapData_t *pHeapData = new CParallelFarmHeapData_t( this, numJobsParallel );
class CYieldingParallelFarmJob : public CGCJob
{
public:
CYieldingParallelFarmJob( CGCBase *pGC, CParallelFarmHeapData_t *pJobData, char const *pchJobName, uint nTimeoutSec ) : CGCJob( pGC, pchJobName )
, m_pJobData( pJobData ), m_iJobSequenceCounter( pJobData->m_iJobSequenceCounter ), m_nTimeoutSec( nTimeoutSec )
{
}
virtual bool BYieldingRunJob( void *pvStartParam )
{
if ( m_nTimeoutSec )
SetJobTimeout( m_nTimeoutSec );
bool bWorkloadCompleted = false;
bool bResult = m_pJobData->m_pHandler
? m_pJobData->m_pHandler->BYieldingRunWorkload( m_iJobSequenceCounter, &bWorkloadCompleted )
: false;
if ( !bResult )
m_pJobData->m_bErrorEncountered = true;
else if ( bWorkloadCompleted )
m_pJobData->m_bWorkloadCompleted = true;
-- m_pJobData->m_numJobsFarmed;
if ( !m_pJobData->m_bErrorEncountered && !m_pJobData->m_bWorkloadCompleted )
{
CYieldingParallelFarmJob *pFarmedJob = new CYieldingParallelFarmJob( m_pGC, m_pJobData, GetName(), m_nTimeoutSec );
++ m_pJobData->m_numJobsFarmed;
++ m_pJobData->m_iJobSequenceCounter;
pFarmedJob->StartJobDelayed( NULL );
}
if ( !m_pJobData->m_numJobsFarmed )
{ // No more farmed jobs to wait for
m_pGC->GetJobMgr().BRouteWorkItemCompletedDelayed( m_pJobData->m_jobIdParent, false );
}
return bResult;
}
protected:
CParallelFarmHeapData_t *m_pJobData;
int m_iJobSequenceCounter;
uint m_nTimeoutSec;
};
for ( ; ; ++ pHeapData->m_iJobSequenceCounter )
{
if ( pHeapData->m_numJobsFarmed < pHeapData->m_numJobsFarmLimit )
{
CYieldingParallelFarmJob *pFarmedJob = new CYieldingParallelFarmJob( GGCBase(), pHeapData, pchJobName, nTimeoutSec );
++ pHeapData->m_numJobsFarmed;
pFarmedJob->StartJobDelayed( NULL );
}
else
{
if ( !GJobCur().BYieldingWaitForWorkItem( pchJobName ) )
{
EmitError( SPEW_GC, "YieldingExecuteParallel: failed to sync with %u farmed work items.\n", pHeapData->m_numJobsFarmed );
pHeapData->m_bErrorEncountered = true;
pHeapData->m_pHandler = NULL; // handler itself may become invalid when the function returns
return false; // leak pHeapData because work items might still be running and this can avoid a crash (this condition is abnormal)
}
break;
}
}
bool bResult = pHeapData->m_bWorkloadCompleted && !pHeapData->m_bErrorEncountered;
delete pHeapData;
return bResult;
}
}