viernes, 29 de julio de 2016

NOTAS FINALES de Threads en Harbour


Todo lo bueno tiene un final, y esta serie sobre threads en Harbour llega a su fin.
Espero que os haya sido ameno y entrenido, tanto como yo en escribirlo.

Es importante ver los ejemplos de Harbour, son muy ilustrativos , aunque para los que no estamos acostumbrados, pueden ser un poco lioso al inicio


Listado de ejemplos de Harbour



harbour/contrib/mt/mttest1.prg
  • Muestra el uso como detached local y valores complejos usando un thread

harbour/contrib/mt/mttest2.prg
  • Muestra el uso del comando QUIT y el estamento ALWAYS en ejecución. El thread hijo usa QUITt antes que el principal

harbour/contrib/mt/mttest3.prg
  • Muestra el uso del comando QUIT y el estamento ALWAYS en ejecución. El thread main usa QUITt antes que el thread hijo

harbour/contrib/mt/mttest4.prg
  • Muestra el uso de variables protegidas y sin proteger

harbour/contrib/mt/mttest5.prg
  • Muestra el uso thread static variables

harbour/contrib/mt/mttest6.prg
  • Usando variables tipo memvar con hilos

harbour/contrib/mt/mttest7.prg
  • Ejemplo de uso de Mutex para enviar/recibir mensajes entre hilos.

harbour/contrib/mt/mttest8.prg
  • Usando el compartir variables de memoria entre hilos

harbour/contrib/mt/mttest9.prg
  • Muestra como usar el mismo alias entre distintos hilos, usando  estas 2 funciones hb_dbRequest y hb_dbDetach.
  • Nota: Un regalo de despedida lo tenéis abajo ;-)

harbour/contrib/mt/mttest10.prg
  • Muestra una consola por thread mostrando un browse

harbour/contrib/mt/mttest11.prg
  • Muestra un ejemplo de un thread asincrono mostrando un reloj

harbour/contrib/mt/mttest12.prg
  • Variables a nivel de hilo, pueden ser declaradas como;
    THREAD STATICt_Var1
    Esta variable es una variable static por cada hilo creado.

Mención especial para el código fuente /contrib/httpd/core.prg, donde podemos ver muchas de estas técnicas explicadas.

De regalo ;-)

Esto fue escrito hace ya 6 años para un sistema para hacer una indexación multihilo, aunque en la mayoría de los casos existe penalización por tema de velocidad de disco, es bueno saber COMO se podria hacer.

En más de 1000 servidores diferentes hemos encontrado que un máxima de 5 hilos por
tabla y un índice a la vez, la ganancia es sustancial en un sistema monolítico.



/*
  Example multiThreads index.
  One thread by table , and one thread by index.
  2010 Rafa Carmona

  Thread Main
       |--------->  Thhread child table for test.dbf
       |                        |----> Thread child index fname
       |                        |
       |                        |----->Thread child index fcode
       |
       |--------->  Thhread child table for test2.dbf
                               |----->Thread child index fname2


*/
#include "hbthread.ch"

proc Main( uCreate )
   Local nSeconds
   Local aFiles  := { "test", "test2" }        // Arrays files dbf
   Local aNtx    := { { "fname", "fcode" },;   // files index for test
                      { "fName2" } }           // files index for test2
   Local aExpr   := { { "name", "code" },;
                      { "dtos(fecha)+str(code)" }  }     // Expresions
   Local cDbf

   if empty( lCreate )
      lCreate := "0"
   endif

   setmode( 25,80 )
   cls

   if uCreate = "1"
      ? "Create test.dbf and test2.dbf"
      dbCreate("test",{ {"name","C",1,0 },{"code","N",7,0 } } )
      use test
      while lastRec() < 1000000
            dbAppend()
            field->name := chr( recno() )
            field->code := recno()
      enddo
      close
      dbCreate("test2",{ {"fecha","D",8,0 },{"code","N",7,0 } } )
      use test2
      while lastRec() < 1000000
            dbAppend()
            field->fecha := date() + recno()
            field->code := recno()
      enddo
      close
   endif

   cls
   // Threads
   nSeconds := Seconds()
   for each cDbf in aFiles
      ? "Process.: " + cDbf
      hb_threadStart( @aCreateIndexe(), cDbf, aNtx[ cDbf:__enumindex ],
aExpr[ cDbf:__enumindex ], cDbf:__enumindex  )
   next

   ? "Wait for threads ...."
   hb_threadWaitForAll()

   ? hb_valTostr( Seconds() - nSeconds )

   ? "finish"

return

function aCreateIndexe( cFile, aNtx, aExpr, nPosDbf )
      Local nContador := 1
      Local cFileNtx, cExpr
      Local nLong := Len( aNtx )
      Local aThreads := {}
      Local cAlias

      use ( cFile )
      cAlias := alias()
      hb_dbDetach( cAlias )  // Libero el alias

      for each cFileNtx in aNtx
          cExpr  := aExpr[ cFileNtx:__enumindex ]
          nContador := 1
          nPos := cFileNtx:__enumindex
          aadd( aThreads, hb_threadStart( @crea(), cAlias,cExpr,
cFileNtx, nPos, nPosDbf ) )
      next

      aEval( aThreads, { |x| hb_threadJoin( x ) } )  // wait threads childs
      hb_dbRequest( cAlias, , , .T.)  // Restaura el alias
      close

RETURN NIL

proc crea( cAlias, cExpr, cFileNtx, nPos, nPosDbf )
     Local nContador := 1

      hb_dbRequest( cAlias, , , .T.)  // Restaura el alias

      INDEX ON &(cExpr) TO &(cFileNtx) ;
            EVAL {|| hb_dispOutAt( nPosDbf, iif( nPos = 1, 20, 40 ),
alltrim( hb_valtostr( nContador) ), "GR+/N" ), nContador += INT(
LASTREC() / 100 ) , .T. } ;
            EVERY INT( LASTREC() / 100 )

      hb_dbDetach( cAlias )          // Libera el alias


return

Sincronización de Threads. Subscribe & Notify


Subscribe  & Notify

Anteriormente vimos un mutex trabajando como un semáforo, ahora veremos que es eso de la suscripción de mensajes.

hb_mutexSubscribe( <pMtx>, [ <nTimeOut> ] [, @<xSubscribed> ] ) -> <lSubscribed>

Dentro de un thread,  suscribimos el thread para ser avisado por una notificación, que  lógicamente, vendrá desde otro hilo distinto, por lo tanto , hb_mutexSubscribe y hb_mutexNotify corren en hilos distintos, y lo que comparten en común es un mutex., <pMtx>.

Si hay notificaciones pendientes, continua la ejecución del thread, si no, estará suspendido a la espera de una notificación, que puede especificar cuantos segundos esperaremos a ser
notificados, <nTimeOut>, por defecto, se queda indefinidamente.
En la variable pasada por referencia, @<xSubscribed>, vamos a obtener el valor que nos
envía desde la notificación. Retorna si tuvo éxito la subscripción al mutex o no, <lSubscribed>

hb_mutexSubscribeNow( <pMtx>, [ <nTimeOut> ] [, @<xSubscribed> ] ) -> <lSubscribed>
El funcionamiento es similar hb_mutexSubscribe, la única diferencia es que antes de comenzar, borra todas las notificaciones pendientes.

Antes de ver un par de ejemplos, vamos a seguir explicando con  hb_mutexNotify.

hb_mutexNotify( <pMtx> [, <xVal>] ) -> NIL
Emite una notificación a todos los threads esperando que están suscritos al mutex, <pMtx>.
Pero, por cada notificación sólo uno de los threads responderá, por lo tanto, tendremos que ir
lanzando notificaciones para que diferentes threads vayan respondiendo.
Si no hay threads a la espera, se van quedando en una cola, que serán enviados conforme se vayan suscribiendo los threads que queramos.
Podemos enviar un parámetro , <xVal>, que como dijimos posteriormente, será recibido por la variable pasada por referencia en hb_mutexSubscribe,  @<xSubscribed>
A tener en cuenta que no existe ningún tipo de relación entre el orden de suscripción y el orden de notificación.

Hay un ejemplo de esto en harbour/contrib/mt/mttest07.prg  donde podéis observar lo aquí explicado, y aunque es ‘duro’ de entender si no sabemos nada, os pongo una versión ‘modificada’ con anotaciones y con pausas, para poder observar que está realizando y observar el comportamiento para intentar asimilarlo con tranquilidad ;-)
En el ejemplo de Harbour, podéis observar como se autoalimentan el hijo del padre y el padre del hijo a través de 2 mutex.

#define N_THREADS 2
#define N_JOBS    5

static s_mtxJobs, s_Result := 0

proc main()
  local aThreads, i, nDigit
  cls

  ? "Main start"

  aThreads := {}
  s_mtxJobs := hb_mutexCreate()

  ? "Arrancamos Thread. "
  ? "Cuando se arranquen los hilos, estos estaran suspendidos a la espera"
  ? "de la notificacion"
  ? "Pulsa una tecla para continuar"
  inkey( 0)

  for i := 1 to N_THREADS
     ? "Thread <" + hb_ntos( i ) + ">"
     aadd( aThreads, hb_threadStart( @thFunc() ) )
  next
  
  ? "Pulsa tecla para empezar a enviar Notificaciones"
  ? "En cada Notificacion, veras que se ejecuta <Run Thread by Nofify> que"
  ? "parte debajo de hb_mutexSubscribe en la funcion thFunc()"
  ? ""
   inkey(0)

  nDigit := 1
  for i := 1 to N_JOBS
     ? "Notifica Job: <" + hb_ntos( i ) + "> Pulsa una tecla para continuar"
     hb_mutexNotify( s_mtxJobs, nDigit )
    inkey( 0 )
     nDigit++
  next
 

  ? "Ahora vamos a enviar NIL, para salir del bucle while de la funcion thFunc()."
  for i := 1 to N_THREADS
     hb_mutexNotify( s_mtxJobs, NIL )
     //?? "<" + hb_ntos( i ) + ">"
  next

  ? "Esperando a los threads..."
  aEval( aThreads, {| x | hb_threadJoin( x ) } )
  ? "Threads se unieron al principal"
   
  ? "Value Total:", s_Result
  ? "End of main"
return

proc thFunc()
  local xJob

  while .T.
     ? "Thread Subscribe: " + "0x" + hb_NumToHex( hb_threadSelf() )
     hb_mutexSubscribe( s_mtxJobs,, @xJob )
     ? "Run Thread by Nofify"
     if xJob == NIL
         ?? "..... exit thread......."
        exit
     endif
    
     // Si no protejo la variable, el valor final puede tener un valor inesperado
     hb_mutexLock( s_mtxJobs )
     s_Result += xJob
     hb_mutexUnLock( s_mtxJobs )
    
  enddo

return


hb_mutexNotifyAll( <pMtx> [, <xVal>] ) -> NIL
Emite una notificación a todos los threads que estén a la espera.
Si no hay threads, no se ejecuta funcion alguna, no se agregan ni se quitan notificaciones que estuviesen en la cola.

#define N_THREADS 5

static s_mtxJobs

proc main()
  local aThreads, i
  cls

  ? "Main start"

  aThreads := {}
  s_mtxJobs := hb_mutexCreate()

  for i := 1 to N_THREADS
     aadd( aThreads, hb_threadStart( @thFunc() ) )
  next

  ? "Ahora vamos a enviar notificacion a todos los hilos."
  inkey( 1 )
  hb_mutexNotifyAll( s_mtxJobs )
  
  ? "Esperando a los threads..."
  aEval( aThreads, {| x | hb_threadJoin( x ) } )
  ? "Threads se unieron al principal"

  ? "End of main"
return

proc thFunc()

  ? "Thread Subscribe: " + "0x" + hb_NumToHex( hb_threadSelf() )
  hb_mutexSubscribe( s_mtxJobs )
  ? "Run Thread by Nofify & dead " + "0x" + hb_NumToHex( hb_threadSelf() )
  
return



Y por último, vemos esta función;
hb_mutexEval( <pMtx>, <bCode> | <@sFunc()> [, <params,...> ] ) -> <xCodeResult>
Aparentemente, lo que hace es ejecutar un codeblock , protegiendo el contenido entre hilos.

KOTLIN. Property , esa cosa que no es un field.

KOTLIN. Property , esa cosa que no es un field. Haciendo una pausa en la documentación, me pongo a experimental el crear / pasar mi...