Source code for ultipa.connection.connectionPoolMaker

# -*- coding: utf-8 -*-

import threading
import time
import queue


[docs] class TooManyConnections(Exception): ...
[docs] class Expired(Exception): ...
[docs] class UsageExceeded(Expired): ...
[docs] class TtlExceeded(Expired): ...
[docs] class IdleExceeded(Expired): ...
[docs] class WrapperConnection(object): def __init__(self, pool, connection): self.pool = pool self.connection = connection self.usage = 0 self.last = self.created = time.time()
[docs] def using(self): self.usage += 1 self.last = time.time() return self
[docs] def reset(self): self.usage = self.last = self.created = 0
def __enter__(self): return self.connection def __exit__(self, *exc_info): self.pool.release(self)
[docs] class ConnectionPool(): ''' Connection pool ''' __wrappers = {} def __init__(self, create, close=None, max_size=3, max_usage=0, ttl=0, idle=120, block=True): if not hasattr(create, '__call__'): raise ValueError('"create" argument is not callable') if close is not None and not hasattr(close, '__call__'): raise ValueError('"close" argument is not callable') self._create = create self._close = close self._max_size = int(max_size) self._max_usage = int(max_usage) self._ttl = int(ttl) self._idle = int(idle) self._block = bool(block) self._lock = threading.Condition() self._pool = queue.Queue() self._size = 0 if self._pool.qsize() == 0: while self._max_size: self._pool.put_nowait(self._wrapper(self._create())) self._max_size -= 1
[docs] def item(self): self._lock.acquire() try: while (self._max_size and self._pool.empty() and self._size >= self._max_size): if not self._block: raise TooManyConnections('Too many connections') self._lock.wait() # Waiting for idle connections if self._pool.qsize() == 0: while self._max_size: self._pool.put_nowait(self._wrapper(self._create())) self._max_size -= 1 try: wrapped = self._pool.get_nowait() # Get an idle connection from the connection pool if self._idle and (wrapped.last + self._idle) < time.time(): self._destroy(wrapped, isCreate=True) raise IdleExceeded('Idle exceeds %d secs' % self._idle) except (queue.Empty, IdleExceeded): wrapped = self._wrapper(self._create()) # Create new connection self._size += 1 finally: self._lock.release() return wrapped.using()
[docs] def release(self, conn): self._lock.acquire() wrapped = self._wrapper(conn) try: self._test(wrapped) except Expired: self._destroy(wrapped) else: self._pool.put_nowait(wrapped) self._lock.notifyAll() # Notify the rest of threads on the availability of idle connection finally: self._lock.release()
def _destroy(self, wrapped, isCreate: bool = False): if self._close: self._close(wrapped.connection) self._unwrapper(wrapped) self._size += 1 if isCreate: self._pool.put_nowait(self._wrapper(self._create())) self._size -= 1 def _wrapper(self, conn): if isinstance(conn, WrapperConnection): return conn _id = id(conn) if _id not in self.__wrappers: self.__wrappers[_id] = WrapperConnection(self, conn) return self.__wrappers[_id] def _unwrapper(self, wrapped): if not isinstance(wrapped, WrapperConnection): return _id = id(wrapped.connection) wrapped.reset() del wrapped if _id in self.__wrappers: del self.__wrappers[_id] def _test(self, wrapped): if self._max_usage and wrapped.usage >= self._max_usage: raise UsageExceeded('Usage exceeds %d times' % self._max_usage) if self._ttl and (wrapped.created + self._ttl) < time.time(): raise TtlExceeded('TTL exceeds %d secs' % self._ttl)