VTK/Streaming
From KitwarePublic
Streaming pieces
Streaming a number of pieces is pretty straightforward. Here is an example
alg->UpdateInformation(); vtkStreamingDemandDrivenPipeline* exec = vtkStreamingDemandDrivenPipeline::SafeDownCast(alg->GetExecutive()); if (!exec || exec->GetMaximumNumberOfPieces() < 10) { die(); } for (int i=0; i<10; i++) { exec->SetUpdateExtent(0, i, 10, 0); alg->Update(); // do something with alg->GetOutputDataObject(0) }
This example streams a pipeline that ends with the algorithm alg in 10 pieces. The best example of streaming pieces is vtkPolyDataMapper:
void vtkPolyDataMapper::Render(vtkRenderer *ren, vtkActor *act) { // ... nPieces = this->NumberOfPieces * this->NumberOfSubPieces; for(int i=0; i<this->NumberOfSubPieces; i++) { // If more than one pieces, render in loop. currentPiece = this->NumberOfSubPieces * this->Piece + i; input->SetUpdateExtent(currentPiece, nPieces, this->GhostLevel); this->RenderPiece(ren, act); } }
For a good example of streaming extents, see vtkMemoryLimitImageDataStreamer and its superclass vtkImageDataStreamer.
Streaming time steps
This example demonstrates how a filter can stream multiple time steps. The example we will use to demonstrate this functionality is vtkTemporalStatistics.
Streaming happens over multiple executions of the filter. Therefore, we need a way of keeping track of which time step we are currently processing. For this purpose, vtkTemporalStatistics uses an data member called CurrentTimeIndex. CurrentTimeIndex is initialized to 0 in the constructor.
vtkTemporalStatistics::vtkTemporalStatistics() { this->CurrentTimeIndex = 0; }
In RequestUpdateExtent(), we request the current time index. This is initially set to 0 but will be incremented after the streaming starts.
int vtkTemporalStatistics::RequestUpdateExtent( vtkInformation *vtkNotUsed(request), vtkInformationVector **inputVector, vtkInformationVector *vtkNotUsed(outputVector)) { vtkInformation *inInfo = inputVector[0]->GetInformationObject(0); double *inTimes = inInfo->Get(vtkStreamingDemandDrivenPipeline::TIME_STEPS()); if (inTimes) { inInfo->Set(vtkStreamingDemandDrivenPipeline::UPDATE_TIME_STEPS(), &inTimes[this->CurrentTimeIndex], 1); } return 1; }
In RequestData(), we first process the current time step. We then increment the current time index and set CONTINUE_EXECUTING if there are more time steps to process. This tells the executive to perform another pass of the REQUEST_UPDATE_EXTENT and REQUEST_DATA. This will lead to another call to RequestUpdateExtent() and RequestData() with the next time index. This will continue until the filter removes CONTINUE_EXECUTING from the request.
int vtkTemporalStatistics::RequestData(vtkInformation *request, vtkInformationVector **inputVector, vtkInformationVector *outputVector) { vtkInformation *inInfo = inputVector[0]->GetInformationObject(0); vtkInformation *outInfo = outputVector->GetInformationObject(0); vtkDataObject *input = vtkDataObject::GetData(inInfo); vtkDataObject *output = vtkDataObject::GetData(outInfo); if (this->CurrentTimeIndex == 0) { // First execution, initialize arrays. this->InitializeStatistics(input, output); } else { // Subsequent execution, accumulate new data. this->AccumulateStatistics(input, output); } this->CurrentTimeIndex++; if ( this->CurrentTimeIndex < inInfo->Length(vtkStreamingDemandDrivenPipeline::TIME_STEPS())) { // There is still more to do. request->Set(vtkStreamingDemandDrivenPipeline::CONTINUE_EXECUTING(), 1); } else { // We are done. Finish up. this->PostExecute(input, output); request->Remove(vtkStreamingDemandDrivenPipeline::CONTINUE_EXECUTING()); this->CurrentTimeIndex = 0; } return 1; }

