MailboxProcessor.Scan<'Msg,'T> (Método de F#)

Busca un mensaje desplazándose por los mensajes según el orden de llegada hasta que una función dada devuelve un valor Some. Los demás mensajes permanecen en la cola.

Espacio de nombres/Ruta de acceso del módulo: Microsoft.FSharp.Control

Ensamblado: FSharp.Core (en FSharp.Core.dll)

// Signature:
member this.Scan : ('Msg -> Async<'T> option) * ?int -> Async<'T>

// Usage:
mailboxProcessor.Scan (scanner)
mailboxProcessor.Scan (scanner, timeout = timeout)

Parámetros

  • scanner
    Tipo: 'Msg -> Async<'T> option

    Función que devuelve None si se debe omitir el mensaje o Some si se debe procesar el mensaje y quitarlo de la cola.

  • timeout
    Tipo: int

    Tiempo de espera opcional en milisegundos. Su valor predeterminado es -1, lo que equivale a Infinite().

Excepciones

Excepción

Condition

TimeoutException

Se produce cuando se supera el tiempo de espera.

Valor devuelto

Cálculo asincrónico (objeto Async) que scanner compiló a partir del mensaje leído.

Comentarios

Este método se usa en el cuerpo del agente. Por cada agente, solo puede haber un lector simultáneo activo como máximo, por lo que no puede haber más de una llamada simultánea activa a Receive, TryReceive, Scan o TryScan. El cuerpo de la función scanner se bloquea durante su ejecución, pero dicho bloqueo se libera antes de la ejecución del flujo de trabajo asincrónico.

Ejemplo

En el siguiente ejemplo, se muestra cómo utilizar el método Scan. En este código, los agentes del procesador del buzón administran una serie de trabajos simulados que ejecutan y calculan un resultado.

open System

let numProcs = Environment.ProcessorCount

type Job<'Result> = int  * Async<'Result>

// Request to run a job, or
// Completed notification (with proc id and jobId
type RequestMessage<'Result> = 
   | Request of Job<'Result>
   | Completed of int * int

// Contains the id of the proc and the job
type RunMessage<'Result> = int * Job<'Result>

let random = System.Random()
// The program computes the Nth prime numbers for various
// values of N.
// This number determines how large values of N are.
let multiplier = 5000

// Generates mock jobs using Async.Sleep.
let createJob(id:int, computation, input:int) =
    let job = async {
        let result = computation(input)
        return result
        }
    id, job

let execAgents = Array.zeroCreate<MailboxProcessor<RunMessage<_>>> numProcs

let controllerAgent = new MailboxProcessor<RequestMessage<_>>(fun inbox ->
    // First try to identify an idle proc by calling tryFindIndex.
    // If there is an idle proc, scan for a request and run it.
    // If there is not an idle proc, scan for an idle notification.
    // No timeout given, so scan may wait indefinitely either to receive
    // a new request, or for a proc to signal that it's idle.  Meanwhile,
    // messages build up in the queue.
    // An array indicating whether each proc is idle.
    let idleStatus = Array.create numProcs true

    let rec loop (count) =
        async {
            let idleId = Array.tryFindIndex (fun elem -> elem) idleStatus
            match idleId with
            | Some id ->
                do! inbox.Scan(function | Request((jobId, _) as job) ->
                                            Some(async { 
                                                idleStatus.[id] <- false
                                                printfn "Job #%d submitted." jobId
                                                execAgents.[id].Post(id, job) })
                                        | Completed _ -> None)

            | None ->
                do! inbox.Scan(function | Request _ -> None
                                        | Completed (id, jobId) -> 
                                            Some(async { idleStatus.[id] <- true }))
            do! loop (count + 1)
        }
    loop 0)

for procId in 0 .. numProcs - 1 do
    execAgents.[procId] <- new MailboxProcessor<RunMessage<_>>(fun inbox ->
        let rec loop (count) =
            async {
                let! procId, (jobId, job) = inbox.Receive()
                // Start the job
                // Post to the controller inbox when complete.
                // The exception and cancellation continuations are not used.
                printfn "Job #%d started on procId %d." jobId procId
                Async.Start(async {
                    let! result = job
                    printfn "Job #%d completed." jobId
                    printfn "Nth Prime for N = %d is %s." (multiplier*jobId) (result.ToString())
                    controllerAgent.Post(Completed(procId, jobId))
                    })
                do! loop (count + 1)
                }
        loop 0)
    execAgents.[procId].Start()

controllerAgent.Start()

let numJobs = 10

printfn "Number Of Logical Processors: %d" numProcs

let isprime number = number > 1 && Seq.forall (fun n -> number % n <> 0) { 2 .. number/2 }

let nthPrime n = Seq.initInfinite (fun n -> n) 
               |> Seq.filter (fun n -> isprime n)
               |> Seq.nth (n - 1)

let rec loop (count) =
    let jobId = (numJobs - count)
    let job = createJob(jobId, (fun n -> nthPrime(n)), multiplier * jobId )
    printfn "Requesting job #%d" jobId
    controllerAgent.Post(Request(job))
    // Delay
    System.Threading.Thread.Sleep(1000);
    match count with
    | 0 -> ()
    | _ -> loop (count - 1)
loop (numJobs - 1)


printfn "Done submitting jobs. Press Enter to exit when ready."
Console.ReadLine() |> ignore

A continuación se muestra una sesión de ejemplo.

                                                  

Plataformas

Windows 7, Windows Vista SP2, Windows XP SP3, Windows XP x64 SP2, Windows Server 2008 R2, Windows Server 2008 SP2, Windows Server 2003 SP2

Información de versiones

Runtime de F#

Se admite en las versiones: 2.0, 4.0

Silverlight

Se admite en la versión: 3

Vea también

Referencia

Control.MailboxProcessor<'Msg> (Clase de F#)

Microsoft.FSharp.Control (Espacio de nombres de F#)

Historial de cambios

Fecha

Historial

Motivo

Enero de 2011

Se ha agregado un ejemplo de código.

Mejora de la información.

1 de abril de 2011

Información corregida sobre el comportamiento de tiempo de espera.

Corrección de errores de contenido.