viernes, 29 de julio de 2016

Sincronización de Threads. Subscribe & Notify


Subscribe  & Notify

Anteriormente vimos un mutex trabajando como un semáforo, ahora veremos que es eso de la suscripción de mensajes.

hb_mutexSubscribe( <pMtx>, [ <nTimeOut> ] [, @<xSubscribed> ] ) -> <lSubscribed>

Dentro de un thread,  suscribimos el thread para ser avisado por una notificación, que  lógicamente, vendrá desde otro hilo distinto, por lo tanto , hb_mutexSubscribe y hb_mutexNotify corren en hilos distintos, y lo que comparten en común es un mutex., <pMtx>.

Si hay notificaciones pendientes, continua la ejecución del thread, si no, estará suspendido a la espera de una notificación, que puede especificar cuantos segundos esperaremos a ser
notificados, <nTimeOut>, por defecto, se queda indefinidamente.
En la variable pasada por referencia, @<xSubscribed>, vamos a obtener el valor que nos
envía desde la notificación. Retorna si tuvo éxito la subscripción al mutex o no, <lSubscribed>

hb_mutexSubscribeNow( <pMtx>, [ <nTimeOut> ] [, @<xSubscribed> ] ) -> <lSubscribed>
El funcionamiento es similar hb_mutexSubscribe, la única diferencia es que antes de comenzar, borra todas las notificaciones pendientes.

Antes de ver un par de ejemplos, vamos a seguir explicando con  hb_mutexNotify.

hb_mutexNotify( <pMtx> [, <xVal>] ) -> NIL
Emite una notificación a todos los threads esperando que están suscritos al mutex, <pMtx>.
Pero, por cada notificación sólo uno de los threads responderá, por lo tanto, tendremos que ir
lanzando notificaciones para que diferentes threads vayan respondiendo.
Si no hay threads a la espera, se van quedando en una cola, que serán enviados conforme se vayan suscribiendo los threads que queramos.
Podemos enviar un parámetro , <xVal>, que como dijimos posteriormente, será recibido por la variable pasada por referencia en hb_mutexSubscribe,  @<xSubscribed>
A tener en cuenta que no existe ningún tipo de relación entre el orden de suscripción y el orden de notificación.

Hay un ejemplo de esto en harbour/contrib/mt/mttest07.prg  donde podéis observar lo aquí explicado, y aunque es ‘duro’ de entender si no sabemos nada, os pongo una versión ‘modificada’ con anotaciones y con pausas, para poder observar que está realizando y observar el comportamiento para intentar asimilarlo con tranquilidad ;-)
En el ejemplo de Harbour, podéis observar como se autoalimentan el hijo del padre y el padre del hijo a través de 2 mutex.

#define N_THREADS 2
#define N_JOBS    5

static s_mtxJobs, s_Result := 0

proc main()
  local aThreads, i, nDigit
  cls

  ? "Main start"

  aThreads := {}
  s_mtxJobs := hb_mutexCreate()

  ? "Arrancamos Thread. "
  ? "Cuando se arranquen los hilos, estos estaran suspendidos a la espera"
  ? "de la notificacion"
  ? "Pulsa una tecla para continuar"
  inkey( 0)

  for i := 1 to N_THREADS
     ? "Thread <" + hb_ntos( i ) + ">"
     aadd( aThreads, hb_threadStart( @thFunc() ) )
  next
  
  ? "Pulsa tecla para empezar a enviar Notificaciones"
  ? "En cada Notificacion, veras que se ejecuta <Run Thread by Nofify> que"
  ? "parte debajo de hb_mutexSubscribe en la funcion thFunc()"
  ? ""
   inkey(0)

  nDigit := 1
  for i := 1 to N_JOBS
     ? "Notifica Job: <" + hb_ntos( i ) + "> Pulsa una tecla para continuar"
     hb_mutexNotify( s_mtxJobs, nDigit )
    inkey( 0 )
     nDigit++
  next
 

  ? "Ahora vamos a enviar NIL, para salir del bucle while de la funcion thFunc()."
  for i := 1 to N_THREADS
     hb_mutexNotify( s_mtxJobs, NIL )
     //?? "<" + hb_ntos( i ) + ">"
  next

  ? "Esperando a los threads..."
  aEval( aThreads, {| x | hb_threadJoin( x ) } )
  ? "Threads se unieron al principal"
   
  ? "Value Total:", s_Result
  ? "End of main"
return

proc thFunc()
  local xJob

  while .T.
     ? "Thread Subscribe: " + "0x" + hb_NumToHex( hb_threadSelf() )
     hb_mutexSubscribe( s_mtxJobs,, @xJob )
     ? "Run Thread by Nofify"
     if xJob == NIL
         ?? "..... exit thread......."
        exit
     endif
    
     // Si no protejo la variable, el valor final puede tener un valor inesperado
     hb_mutexLock( s_mtxJobs )
     s_Result += xJob
     hb_mutexUnLock( s_mtxJobs )
    
  enddo

return


hb_mutexNotifyAll( <pMtx> [, <xVal>] ) -> NIL
Emite una notificación a todos los threads que estén a la espera.
Si no hay threads, no se ejecuta funcion alguna, no se agregan ni se quitan notificaciones que estuviesen en la cola.

#define N_THREADS 5

static s_mtxJobs

proc main()
  local aThreads, i
  cls

  ? "Main start"

  aThreads := {}
  s_mtxJobs := hb_mutexCreate()

  for i := 1 to N_THREADS
     aadd( aThreads, hb_threadStart( @thFunc() ) )
  next

  ? "Ahora vamos a enviar notificacion a todos los hilos."
  inkey( 1 )
  hb_mutexNotifyAll( s_mtxJobs )
  
  ? "Esperando a los threads..."
  aEval( aThreads, {| x | hb_threadJoin( x ) } )
  ? "Threads se unieron al principal"

  ? "End of main"
return

proc thFunc()

  ? "Thread Subscribe: " + "0x" + hb_NumToHex( hb_threadSelf() )
  hb_mutexSubscribe( s_mtxJobs )
  ? "Run Thread by Nofify & dead " + "0x" + hb_NumToHex( hb_threadSelf() )
  
return



Y por último, vemos esta función;
hb_mutexEval( <pMtx>, <bCode> | <@sFunc()> [, <params,...> ] ) -> <xCodeResult>
Aparentemente, lo que hace es ejecutar un codeblock , protegiendo el contenido entre hilos.

No hay comentarios:

Publicar un comentario

Android y Git. Disponer del hash automáticamente.

Una de las cosas a las que estoy acostumbrado, es tener siempre en mi código, el hash/tag/versión del control de versiones que estoy usan...