Subscribe & Notify
Anteriormente vimos un mutex trabajando como un semáforo, ahora veremos que es eso de la suscripción de mensajes.
hb_mutexSubscribe( <pMtx>, [ <nTimeOut> ] [, @<xSubscribed> ] ) -> <lSubscribed>
Dentro de un thread, suscribimos el thread para ser avisado por una notificación, que lógicamente, vendrá desde otro hilo distinto, por lo tanto , hb_mutexSubscribe y hb_mutexNotify corren en hilos distintos, y lo que comparten en común es un mutex., <pMtx>.
Si hay notificaciones pendientes, continua la ejecución del thread, si no, estará suspendido a la espera de una notificación, que puede especificar cuantos segundos esperaremos a ser
notificados, <nTimeOut>, por defecto, se queda indefinidamente.
En la variable pasada por referencia, @<xSubscribed>, vamos a obtener el valor que nos
envía desde la notificación. Retorna si tuvo éxito la subscripción al mutex o no, <lSubscribed>
hb_mutexSubscribeNow( <pMtx>, [ <nTimeOut> ] [, @<xSubscribed> ] ) -> <lSubscribed>
El funcionamiento es similar hb_mutexSubscribe, la única diferencia es que antes de comenzar, borra todas las notificaciones pendientes.
Antes de ver un par de ejemplos, vamos a seguir explicando con hb_mutexNotify.
hb_mutexNotify( <pMtx> [, <xVal>] ) -> NIL
Emite una notificación a todos los threads esperando que están suscritos al mutex, <pMtx>.
Pero, por cada notificación sólo uno de los threads responderá, por lo tanto, tendremos que ir
lanzando notificaciones para que diferentes threads vayan respondiendo.
Si no hay threads a la espera, se van quedando en una cola, que serán enviados conforme se vayan suscribiendo los threads que queramos.
Podemos enviar un parámetro , <xVal>, que como dijimos posteriormente, será recibido por la variable pasada por referencia en hb_mutexSubscribe, @<xSubscribed>
A tener en cuenta que no existe ningún tipo de relación entre el orden de suscripción y el orden de notificación.
Hay un ejemplo de esto en harbour/contrib/mt/mttest07.prg donde podéis observar lo aquí explicado, y aunque es ‘duro’ de entender si no sabemos nada, os pongo una versión ‘modificada’ con anotaciones y con pausas, para poder observar que está realizando y observar el comportamiento para intentar asimilarlo con tranquilidad ;-)
En el ejemplo de Harbour, podéis observar como se autoalimentan el hijo del padre y el padre del hijo a través de 2 mutex.
#define N_THREADS 2
#define N_JOBS 5
static s_mtxJobs, s_Result := 0
proc main()
local aThreads, i, nDigit
cls
? "Main start"
aThreads := {}
s_mtxJobs := hb_mutexCreate()
? "Arrancamos Thread. "
? "Cuando se arranquen los hilos, estos estaran suspendidos a la espera"
? "de la notificacion"
? "Pulsa una tecla para continuar"
inkey( 0)
for i := 1 to N_THREADS
? "Thread <" + hb_ntos( i ) + ">"
aadd( aThreads, hb_threadStart( @thFunc() ) )
next
? "Pulsa tecla para empezar a enviar Notificaciones"
? "En cada Notificacion, veras que se ejecuta <Run Thread by Nofify> que"
? "parte debajo de hb_mutexSubscribe en la funcion thFunc()"
? ""
inkey(0)
nDigit := 1
for i := 1 to N_JOBS
? "Notifica Job: <" + hb_ntos( i ) + "> Pulsa una tecla para continuar"
hb_mutexNotify( s_mtxJobs, nDigit )
inkey( 0 )
nDigit++
next
? "Ahora vamos a enviar NIL, para salir del bucle while de la funcion thFunc()."
for i := 1 to N_THREADS
hb_mutexNotify( s_mtxJobs, NIL )
//?? "<" + hb_ntos( i ) + ">"
next
? "Esperando a los threads..."
aEval( aThreads, {| x | hb_threadJoin( x ) } )
? "Threads se unieron al principal"
? "Value Total:", s_Result
? "End of main"
return
proc thFunc()
local xJob
while .T.
? "Thread Subscribe: " + "0x" + hb_NumToHex( hb_threadSelf() )
hb_mutexSubscribe( s_mtxJobs,, @xJob )
? "Run Thread by Nofify"
if xJob == NIL
?? "..... exit thread......."
exit
endif
// Si no protejo la variable, el valor final puede tener un valor inesperado
hb_mutexLock( s_mtxJobs )
s_Result += xJob
hb_mutexUnLock( s_mtxJobs )
enddo
return
|
hb_mutexNotifyAll( <pMtx> [, <xVal>] ) -> NIL
Emite una notificación a todos los threads que estén a la espera.
Si no hay threads, no se ejecuta funcion alguna, no se agregan ni se quitan notificaciones que estuviesen en la cola.
#define N_THREADS 5
static s_mtxJobs
proc main()
local aThreads, i
cls
? "Main start"
aThreads := {}
s_mtxJobs := hb_mutexCreate()
for i := 1 to N_THREADS
aadd( aThreads, hb_threadStart( @thFunc() ) )
next
? "Ahora vamos a enviar notificacion a todos los hilos."
inkey( 1 )
hb_mutexNotifyAll( s_mtxJobs )
? "Esperando a los threads..."
aEval( aThreads, {| x | hb_threadJoin( x ) } )
? "Threads se unieron al principal"
? "End of main"
return
proc thFunc()
? "Thread Subscribe: " + "0x" + hb_NumToHex( hb_threadSelf() )
hb_mutexSubscribe( s_mtxJobs )
? "Run Thread by Nofify & dead " + "0x" + hb_NumToHex( hb_threadSelf() )
return
|
Y por último, vemos esta función;
hb_mutexEval( <pMtx>, <bCode> | <@sFunc()> [, <params,...> ] ) -> <xCodeResult>
Aparentemente, lo que hace es ejecutar un codeblock , protegiendo el contenido entre hilos.
No hay comentarios:
Publicar un comentario