from typing import List
from ultipa import Connection, UltipaConfig
from ultipa.connection.connectionPoolMaker import ConnectionPool
from ultipa.connection.connectionBase import ConnectionBase
[docs]
class UltipaConnectionPool():
'''
Ultipa connection pool class.
'''
def __init__(self, defaultConfig: UltipaConfig = UltipaConfig()):
self.pool = ConnectionPool(lambda: Connection.NewConnection(defaultConfig=defaultConfig), max_size=10)
[docs]
def get_conn(self) -> ConnectionBase:
with self.pool.item() as ultipa_cli:
return ultipa_cli
def __del__(self):
conn = self.get_conn()
wrapped = self.pool._wrapper(conn)
self.pool._destroy(wrapped)
[docs]
def destroyConn(self, conn):
wrapped = self.pool._wrapper(conn)
self.pool._destroy(wrapped)