Update: Library-AsyncRunspace.ps1

I have found a few bugs in Library-AsyncRunspace version 1.1 and I have added a couple of new functions so it’s time for an update. The current version is 1.3 now. Yes, I skipped an update. Version 1.2 added the Stop-AsyncPipe function that can be used whenever a pipeline gets stuck in an infinite loop. Version 1.3 added the Write-AsyncMessage function and each new runspace also has a Read-AsyncMessage function. Some mild bugs were also fixed.

An object queue has now been associated with each runspace. The Write-AsyncMessage function will enqueue an object into the queue and Read-AsyncMessage will dequeue objects form the object queue in order. The purpose of the queue is to easily allow control messages to be sent into a runspace that is operating on a pipeline that is of indefinite length.

There is no specific use for this new feature. You may use it how you like. An example of its use would be to have a server run in a background runspace, periodically checking for messages in its queue for information on what to do next.
All other changes are bug fixes.

Here is the code:

# Library-AsyncRunspace
# Version: 1.3

# Include with {. Library-AsyncRunspace}

# Sugjested aliases…
# New-Alias eap End-AsyncPipe
# New-Alias gar Get-AsyncRunspace
# New-Alias wam Write-AsyncMessage
# New-Alias r! Invoke-ExpressionInRunspace
# New-Alias iar Invoke-ExpressionInRunspace
# New-Alias nar New-AsyncRunspace
# New-Alias rap Read-AsyncPipe
# New-Alias rar Remove-AsyncRunspace
# New-Alias sap Start-AsyncPipe

# Don’t destroy previous runspace catalog
if ( -not $RunspaceCatalog ) {

# Declare the runspace catalog
$RunspaceCatalog = New-Object PSObject
$RunspaceCatalog | Add-Member NoteProperty RunspaceCountPosition 0 -PassThru |
 Add-Member NoteProperty RunspaceList @{}

function New-AsyncRunspace {
 param ([String]$NewName)
 
 # Incroment id number counter
 $RunspaceCatalog.RunspaceCountPosition++
 
 # Generate a runspace and add usefull properties
 $NewRunspace = [management.automation.runspaces.runspacefactory]::CreateRunspace() |
  Add-Member NoteProperty Name $NewName -PassThru |
  Add-Member NoteProperty CurrentPipe $null -PassThru |
  Add-Member NoteProperty MessageQueue ([Collections.Queue]::Synchronized($(New-Object Collections.Queue))) -PassThru |
  Add-Member ScriptProperty PipeState { $this.CurrentPipe.PipelineStateInfo.State } -PassThru |
  Add-Member ScriptProperty PipeStateReason { $this.CurrentPipe.PipelineStateInfo.Reason } -PassThru |
  Add-Member ScriptProperty CurrentCommands { "{$($this.CurrentPipe.Commands)}" } -PassThru
 $NewRunspace.Open()
 
 # Add a reference to the message queue and a function to read it to the runspace
 $PipeLine = $NewRunspace.CreatePipeline({
  $MessageQueue = $input | Select-Object -first 1
  function Read-AsyncMessage {
   if ($MessageQueue.Count) { return $MessageQueue.Dequeue() }
   else { return $null }
  }
 })
 [void]$Pipeline.Input.Write($NewRunspace.MessageQueue)
 $Pipeline.Input.Close()
 $PipeLine.Invoke()
 
 # Add new runspace to catalog
 $RunspaceCatalog.RunspaceList[$RunspaceCatalog.RunspaceCountPosition] = $NewRunspace
 
 # Report new runspace id number
 $RunspaceCatalog.RunspaceCountPosition
}

function Remove-AsyncRunspace {
 param ($RunspaceID = (read-host "Enter a valid runspace ID or name."))

 if ( -not ($RunspaceID -is [Int32] -or $RunspaceID -is [String])) { throw "A runspace id must be supplied by integer or name!" }
 
 #Find selected runspace by number
 if ($RunspaceID -is [Int32]) {
  $RunspaceCatalog.RunspaceList[$RunspaceID].Close()
  $RunspaceCatalog.RunspaceList.Remove($RunspaceID)
 }
 #Or find selected runspace by name
 else {
  $keys = $RunspaceCatalog.RunspaceList.Keys
  $keys | Foreach-Object {
   if ( $RunspaceCatalog.RunspaceList[$_].Name -like $RunspaceID ) {
    $RunspaceCatalog.RunspaceList[$_].Close()
    $RunspaceCatalog.RunspaceList.Remove($_)
   }
  }
 }
}

function Get-AsyncRunspace {
 param ($RunspaceID)
 
 # Return the entire list of available runspaces by default
 if ( $RunspaceID -eq $null ) {
  return $RunspaceCatalog.RunspaceList.Values
 }
 
 # Type check on id
 if ( -not ($RunspaceID -is [Int32] -or $RunspaceID -is [String])) { throw "A runspace id must be supplied by integer or name!" }
 
 #Find selected runspace by number
 if ($RunspaceID -is [Int32]) {
  $SelectedRunspace = $RunspaceCatalog.RunspaceList[$RunspaceID]
 }
 #Or find selected runspace by name
 else {
  $SelectedRunspace = $RunspaceCatalog.RunspaceList.Values | Where-Object { $_.Name -like $RunspaceID }
 }
 
 # Return result
 $SelectedRunspace
}

function Write-AsyncMessage {
 param ($RunspaceID = (read-host "Enter a valid runspace ID or name."), [Object]$Message)
 
 $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
 
 # Throw error if no runspace was found with the given id
 if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }

 # Append $Message to message queue
 $SelectedRunspace.MessageQueue.Enqueue($Message)
}

function Invoke-ExpressionInRunspace {
 param ($RunspaceID = (read-host "Enter a valid runspace ID or name."), [String]$Command = (read-host "Enter a command to be executed in the runspace."))
 
 $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
 
 # Throw error if no runspace was found with the given id
 if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }
 
 # Insert the command into the runspace.
 $Pipeline = $SelectedRunspace.CreatePipeline($Command)

 # Insert input into the new pipe
 $input | Foreach-Object { [void]$Pipeline.Input.Write($_) }
 $Pipeline.Input.Close()
 
 # Execute pipe
 $Pipeline.Invoke()
 
 # Read pipeline result
 if ($Pipeline.PipelineStateInfo.State -eq [management.automation.runspaces.pipelinestate]::failed) {
  throw $Pipeline.PipelineStateInfo.Reason
 }
 $Pipeline.Error.ReadToEnd() | Foreach-Object { Write-Error $_ }
 $Pipeline.Output.ReadToEnd()
}

function Start-AsyncPipe {
 param (
  $RunspaceID = (read-host "Enter a valid runspace ID or name."),
  [String]$Command = (read-host "Enter a command to executed in the runspace.")
 )

 BEGIN {
  $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
  
  # Throw error if no runspace was found with the given id
  if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }
  
  # Insert the command into the runspace.
  $Pipeline = $SelectedRunspace.CreatePipeline($Command)
  
  # Start async process
  $Pipeline.InvokeAsync()
  
  # List this pipe as current with the runspace
  $SelectedRunspace.CurrentPipe = $Pipeline
 }
 
 PROCESS {
 # Insert input into the new pipe
 [void]$Pipeline.Input.Write($_)
 }
 
 END {
 # Close input
 $Pipeline.Input.Close()
 }
}

function Read-AsyncPipe {
 param ($RunspaceID = (read-host "Enter a valid runspace ID or name."))
 $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1

 # Throw error if no runspace was found with the given id
 if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }

 if ($SelectedRunspace.CurrentPipe) {
  # Read pipeline result
  if ($SelectedRunspace.CurrentPipe.pipelineStateInfo.state -eq [management.automation.runspaces.pipelinestate]::failed) {
   throw $SelectedRunspace.CurrentPipe.PipelineStateInfo.Reason
  }
  $SelectedRunspace.CurrentPipe.Error.NonBlockingRead() | Foreach-Object { Write-Error $_ }
  $SelectedRunspace.CurrentPipe.Output.NonBlockingRead()
 }
}

function Stop-AsyncPipe {
 param ($RunspaceID = (read-host "Enter a valid runspace ID or name."))

 $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
 
 # Throw error if no runspace was found with the given id
 if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }
 
 if ( $SelectedRunspace.CurrentPipe ) {
  $SelectedRunspace.CurrentPipe.StopAsync()

  # Pipeline is no longer current
  $SelectedRunspace.CurrentPipe = $null
 }
}

function End-AsyncPipe {
 param ($RunspaceID = (read-host "Enter a valid runspace ID or name."))

 $SelectedRunspace = Get-AsyncRunspace $RunspaceID | Select-Object -first 1
 
 # Throw error if no runspace was found with the given id
 if ( ($SelectedRunspace | Measure-Object).Count -eq 0 ) { throw "Runspace not found!" }

 # Check current pipeline
 if ($SelectedRunspace.CurrentPipe) {
  # Pipeline is no longer current
  $Pipeline = $SelectedRunspace.CurrentPipe
  $SelectedRunspace.CurrentPipe = $null
 
  # Read pipeline result
  if ($Pipeline.PipelineStateInfo.State -eq [management.automation.runspaces.pipelinestate]::failed) {
   throw $Pipeline.PipelineStateInfo.Reason
  }
  while ( -not $Pipeline.Output.EndOfPipeline -or -not $Pipeline.Error.EndOfPipeline ) {
   $Pipeline.Error.NonBlockingRead() | Foreach-Object { Write-Error $_ }
   $Pipeline.Output.NonBlockingRead()
   sleep -m 100
  }
 }
}

} # End if ( -not $RunspaceCatalog )

Advertisements

~ by lunaticexperimentalist on January 31, 2007.

3 Responses to “Update: Library-AsyncRunspace.ps1”

  1. I used your functions in my perf script but didn’t see the Async execution of scripts as you intended. Here is my script (the DNSPerf.ps1 in it is a script I wrote to perf test DNS lookup):. .\Library-AsyncRunspace.ps1New-AsyncRunspace r1New-AsyncRunspace r2iar r1 {Add-PsSnapin PoshNet; $h = C:\Work\MSNAD\DNSPerf\DNSPerf.ps1 4.2.2.2 microsoft.com 10 1000}iar r2 {Add-PsSnapin PoshNet; $h = C:\Work\MSNAD\DNSPerf\DNSPerf.ps1 4.2.2.2 microsoft.com 15 1000}"$(iar r1 {$h})""$(iar r2 {$h})"Here is the result:PS C:\DNSPerf> .\test.ps112Stats: 10 ms or less: 0 25 ms or less: 8 50 ms or less: 2 100 ms or less: 0 250 ms orless: 0 500 ms or less: 0 1000 ms or less: 0 2500 ms or less: 0 Bogus domain: 0Error: 0 Begin: 6/11/2009 6:39:12 PM End: 6/11/2009 6:39:23 PMStats: 10 ms or less: 0 25 ms or less: 13 50 ms or less: 1 100 ms or less: 0 250 ms or less: 0 500 ms or less: 0 1000 ms or less: 0 2500 ms or less: 0 Bogus domain: 0 Error: 1 Begin: 6/11/2009 6:39:23 PM End: 6/11/2009 6:39:39 PMAs you can see from the timestamps, the second runspace (r2) started only after the first (r1) was done in a sequential fashion. Weren’t they supposed to execute in parallel (Async) unless I missed something?

  2. @Alex YInvoke-ExpressionInRunspace(iar/r!) is intended to be used when you want to perform synchronous processing. iar may be used when you need to add or remove data from a runspace. When you need to perform some process asynchronously in the back ground you will need to use the Start-AsyncPipe function.Start-AsyncPipe will write it’s command to the target runspace and begin processing that command immediatly. Start-AsyncPipe will then continue to write it’s input pipe to the input pipe of the asynchronous pipe, and return once its input pipe has emptied. You can then use the Read-AsyncPipe function to read the output of the asynchronous pipe and return without blocking the calling script. You can also use the End-AsyncPipe to read the output of the asynchronous pipe continuously, blocking until the command running in the asynchronous runspace has finished.I believe your script should look more like this:. .\Library-AsyncRunspace.ps1New-AsyncRunspace r1New-AsyncRunspace r2sap r1 {Add-PsSnapin PoshNet; $h = C:\Work\MSNAD\DNSPerf\DNSPerf.ps1 4.2.2.2 microsoft.com 10 1000; $h}sap r2 {Add-PsSnapin PoshNet; $h = C:\Work\MSNAD\DNSPerf\DNSPerf.ps1 4.2.2.2 microsoft.com 15 1000; $h}eap r1eap r2Here we use sap to start two processes then use eap to block until both are finished, returning the results in the output of the current script.

  3. Your solution worked! Having a more detailed documentation did help clearing up some confusion. Thank you.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
%d bloggers like this: