¿Qué hay más allá de nuestro modelo?

Exploramos una arquitectura para una solución de software completa donde desplegar un modelo de Python, desde la interfaz de usuario hasta la conexión con el modelo y la obtención de los resultados

Background

Por mucho tiempo y sobre todo en nuestra etapa de exploración en el campo de los desafíos de machine learning nos centramos en la búsqueda de un modelo que partiendo de los datos en determinado estado, pueda actuar para obtener un resultado, y que ese resultado se encuentre en los rangos de mediciones que nos planteamos como aceptables.

Bien, el campo de trabajo para la obtención del modelo es tan extenso, complejo y fascinante por si mismo que puede hacernos perder un poco de vista el resto de los componentes que intervienen en una solución de software.

Por tanto a nuestro modelo lo van a rodear otro conjunto de problemas tan complejos, extensos y fascinantes como el propio modelo que hemos desarrollado, por ejemplo:

  • La extracción y transformación de datos para tener un dataset similar al que partimos
  • El deploy del modelo en producción
  • La toma de datos y validación posterior para monitorear y corregir nuestro modelo inicial.

Vamos a encontrarnos entonces con los campos de ETL, Data Wrangling y MLOps.

Como científicos de datos, ya no alcanza con tratar de encontrar un modelo que resuelva el desafío, tenemos que pensar al menos en la viabilidad de todas estas áreas relacionadas. Supongamos por un momento que logramos asegurarnos la cadena de obtención de datos, que nos defendemos durante nuestro pipeline de feature engineering de casos nuevos o datos anómalos, que conseguimos una forma eficiente de ponerlo en producción y que almacenamos el dataset antes y después de cada nueva predicción para poder monitorear el modelo .. ¡uff! … parece que tenemos todo previsto. ¿Cierto?

¡Posiblemente no!

Puede que nos falte un software que interactúe con el usuario para presentarle las predicciones realizadas. En ocasiones nuestro modelo va a conectarse con entornos complejos y es necesario crear una API entre ambos para que el modelo pueda desplegar sus resultados, en ocasiones será necesario desarrollar un software para que el usuario final pueda usar nuestro modelo.

En este artículo vamos a presentar un ensayo desarrollado a finales del año pasado, los componentes y herramientas utilizados por supuesto que podrían ser cambiados por cualquier otro tipo de lenguaje, librería y arquitectura.

El caso de uso

Partimos de un modelo desarrollado en Python, que recibe un CSV y devuelve un archivo de Excel, agregando al final una columna con el resultado de la predicción.

Precisamos desplegar en un servidor este modelo, y desarrollar una interfaz de usuario que:

  • Pida y valide el usuario.
  • Le permita al usuario seleccionar un archivo CSV de su equipo en el formato adecuado y subirlo al modelo.
  • Despliegue al usuario un panel con todos los archivos subidos y su estatus (procesado, fallo, en proceso).
  • Le informe al usuario cuando un modelo se termine de procesar.

Adicionalmente vamos a envolver el modelo en una API para que pueda ser consumido como servicio por otro software.


Arquitectura de la solución

La solución propuesta consta de seis componentes y tres puntos de acceso.

Los componentes son en orden de aparición:

1) Interfase de usuario

2) Base de datos (MySQL)

3) Repositorio en el servidor para guardar los datasets de entrada y salida

4) Scheduler (cron) para agendar la búsqueda de trabajos pendientes e invocar al modelo

5) API para la ejecución del modelo (Pyhton — Uvicorn — Starlette)

6) Modelo de machine learning en Pyhton

Los puntos de acceso al sistema son:

a) El usuario al subir un nuevo dataset a la aplicación

b) El cron al detectar un trabajo pendiente de procesar

c) Alguien con las credenciales correctas desea invocar específicamente la API del modelo para procesar directamente un dataset registrado en la base de datos

La idea es que un usuario suba a través de la interfase un CSV para ser procesado, el componente 1 permite eso y controla que el CSV sea correcto, graba en la base de datos un trabajo en status “pendiente” y deposita en el repositorio de entrada el dataset.

Luego, el cron revisa cada “x” tiempo que exista algún trabajo pendiente, lo toma y cambia a un estado “en proceso”, invocando la función que invoca al modelo. El resultado del proceso es depositado en el repositorio de salida, actualizando el status de la tarea a “Procesado”.

Desde el panel de usuario (punto 1) se pueden detectar trabajos con errores y quitarlos de la lista de tareas, archivarlos o reintentarlo

Los componentes

1 — Panel de usuario

El panel de usuario fue desarrollado en WordPress®, mediante un plugin se confecciono el acceso y manejo de usuarios:

Luego del acceso el panel cuenta con un cabezal donde el usuario puede seleccionar un dataset, el cual es validado en su estructura, antes de cargarlo en la base de datos como pendiente, y dejarlo en el repositorio de entrada. Estos procesos fueron desarrollados en PHP y WordPress pero pudieron ser desarrollados en cualquier otro ambiente de programacion web.

2 — Base de datos

La base de datos seleccionada fue MySQL.

La tabla sobre la que se va dejando registro de las tareas tiene la estructura aquí observada.

3 — Repositorio de datos

El repositorio de entrada y salida son dos directorios definidos en el servidor donde van quedando los archivos conforme se generan entradas en la tabla de tareas.

Una vez que el modelo de Python procesa un dataset y devuelve un CSV con el resultado, el mismo es grabado en un directorio de salida.

4 — Scheduler de ejecución (cronJobs)

El siguiente código es orientativo de como procesar los archivos, abre la base de datos (abro_db), busca si existen tareas en el estado “PENDIENTE” y en caso de haberlas, toma la primera de ellas e invoca el proceso de predicción (proceso_archivo).

Previamente cheuea si hay alguna tarea ejecutándose aún y en tal caso no continúa. Por supuesto que todas estas decisiones y reglas de negocio son ajustables al caso de uso.

import mysql.connector as mysql
from mysql.connector import Error
from datetime import datetime
from jproperties import Properties
if __name__ == '__main__':
   resultado = ''
   try:
      db = abro_bd()
      cursor = db.cursor()
      sql_Query = "SELECT id FROM wp_registros WHERE status_file='EN PROCESO'"
      cursor.execute(sql_Query)
      records = cursor.fetchall()
      if (len(records) != 0):
          resultado = 'Hay archivos en proceso ' + str(records[0][0])
      else:
          sql_Query = "SELECT id FROM wp_registros WHERE status_file='PENDIENTE'"
          cursor.execute(sql_Query)
          records = cursor.fetchall()
          if (len(records) > 0):
             elid = records[0][0]
             resultado = proceso_archivo(elid)
      except Error as e:
          resultado = 'Error ' + e
      finally:
          if (db.is_connected()):
             db.close()
             cursor.close()
             resultado = resultado + ' (BD cerrada)'

El código para abrir la base de datos, que cuenta con un archivo de configuración ConfigFile.properties donde se define la base da datos y las credenciales de acceso.

def abro_bd():
   configs = Properties()
   with open('ConfigFile.properties', 'rb') as config_file:
      configs.load(config_file)
      elhost = configs.get("HOST")[0]
      eluser = configs.get("USER")[0]
      lapass = configs.get("PASSWORD")[0]
      ladatabase = configs.get("DBNAME")[0]
      
      db = mysql.connect(
           host = elhost,
           user = eluser,
           passwd = lapass,
           database=ladatabase)
      return db

El código para procesar un archivo específico (una id de la base de datos podría ser algo así):

def proceso_archivo(elid):
   os.environ['TZ'] = 'America/Montevideo'
   time.tzset()
   resultado = ''
   try:
      db = abro_bd()
      cursor = db.cursor()
      sql_Query = "SELECT path_up FROM dw_registros WHERE id=%s"
      id = (elid,)
      cursor.execute(sql_Query, id)
      record = cursor.fetchone()
      archivo = str(record[0])
      upd_Query = "UPDATE dw_registros SET status_file = 'EN PROCESO',date_init=%s WHERE id=%s"
      hoy = datetime.today()
      params = (str(hoy),elid,)
      cursor.execute(upd_Query, params)
      db.commit()
      salida = ''
      salida = predictions.prediccion(archivo)
      hoy = datetime.today()
      upd_Query = "UPDATE dw_registros SET status_file='PROCESADO OK',path_result = %s,date_result=%s WHERE id=%s"
      params = (salida,str(hoy),elid,)
      cursor.execute(upd_Query, params)
      db.commit()
      resultado = 'OK - Archivo generado [' + salida + ']'
   except Error as e:
      resultado = 'Error ' + e + ' procesando el id ' +  elid
   finally:
      if (db.is_connected()):
         db.close()
         cursor.close()
         resultado = resultado + ' (BD cerrada)'
         return resultado

5 — La API de Uvicorn y Starlette

El siguiente es un código Python para desarrollar una API utilizando Uvicorn y Starlette, los cuales deben ser instalados (mediante pip install).

En nuestra API desplegamos 3 metodos:

  • / para verificar que el servidor esta corriendo y nuestra API funcionando
  • /process_file?id=id_base_datos es un método para procesar determinado archivo, debemos pasarle por parámetro el id de la tabla de tareas correspondiente al archivo a procesar (recordemos que en el componente 1 fueron creadas las entradas en esta tabla y descargados los datasets en el repositorio)
  • /process_all sin parámetros sirve para buscar en todas las entradas alguna pendiente y procesarla
from starlette.applications import Starlette
from starlette.responses import JSONResponse
import uvicorn
import connect
import proceso_pendientes
from datetime import datetime
app = Starlette(debug=True)
@app.route('/')
async def homepage(request):
   return JSONResponse({'MySAMPLE': 'It works!'})
@app.route("/process_file", methods=["GET"])
   async def process_file(request):
   elid = request.query_params["id"]
   antes = datetime.today()
   resultado = connect.proceso_archivo(elid)
   despues = datetime.today()
   return JSONResponse({
      "resultado": resultado,
      "Inicio" : str(antes),
      "Fin" : str(despues)})
@app.route("/process_all", methods=["GET"])
async def process_all(request):
   antes = datetime.today()
   resultado = proceso_pendientes.recorro_archivos(lohago)
   despues = datetime.today()
   return JSONResponse({
      "resultado": resultado,
      "Inicio" : str(antes),
      "Fin" : str(despues)})
if __name__ == '__main__':
   uvicorn.run(app, host='0.0.0.0', port=8000)

El código que recorre los pendientes y el que procesa un archivo se especificó en el punto 4.

6 — El modelo de Python

El modelo se encuentra en otro módulo de la aplicación y no es desplegado en este artículo dado que no es nuestro objetivo detallarlo aquí.


Consideraciones finales

A diferencia de artículos anteriores donde nos centramos en las herramientas y abordajes de un desafío específico de Machine Learning, en esta ocasión hemos expuesto otra parte del esfuerzo; quizás no tan atractiva pero si muy necesaria para transformar una solución a un caso de uso, en una solución completa.

Por supuesto que la arquitectura de cada solución se define en función del problema, la estructura ya existente en el cliente, el conocimiento que tengamos de las herramientas para cada instancia o de las personas que complementan nuestro equipo de trabajo. Existen gran cantidad de librerías y herramientas para cada etapa.

En resumen: desplegar sobre la mesa otros aspectos cercanos a un modelo que pueden ser necesarios para una solución completa.

Shere this:

Deja un comentario