# -*- coding: utf-8 -*-
# @Time : 2023/7/18 09:20
# @Author : Ultipa
# @Email : [email protected]
# @File : connectionBase.py
import logging
import time
import schedule
from ultipa import ParameterException
from ultipa.configuration.UltipaConfig import UltipaConfig
from ultipa.configuration.RequestConfig import RequestConfig
from ultipa.connection.clientInfo import ClientInfo
from ultipa.connection.clientType import ClientType
from ultipa.connection.hostManagerControl import HostManagerControl, RAFT_GLOBAL
from ultipa.connection.uqlHelper import UQLHelper
from ultipa.proto import ultipa_pb2
from ultipa.utils import CommandList
from ultipa.utils.common import GETLEADER_TIMEOUT
from ultipa.utils.format import FormatType
from ultipa.utils.logger import LoggerConfig
from ultipa.types import ULTIPA, ULTIPA_RESPONSE
from ultipa.utils.ultipaSchedule import run_continuously
[docs]
class ConnectionBase:
'''
A base class that defines settings for an Ultipa connection.
'''
def __init__(self, host: str, defaultConfig: UltipaConfig, crtFilePath: str = None):
self.host = host
self.username = defaultConfig.username
self.password = defaultConfig.password
self.crtPath = crtFilePath
self.defaultConfig = defaultConfig
self.runSchedule: object = None
self.crt = None
if crtFilePath:
try:
with open(f'{crtFilePath}', 'rb') as f:
self.crt = f.read()
except Exception as e:
raise ParameterException(err=e)
self.hostManagerControl = HostManagerControl(self.host, self.username, self.password,
self.defaultConfig.maxRecvSize, self.crt,
consistency=defaultConfig.consistency)
self.defaultConfig.defaultGraph = defaultConfig.defaultGraph or "default"
self.defaultConfig.timeoutWithSeconds = defaultConfig.timeoutWithSeconds or 15
self.defaultConfig.responseWithRequestInfo = defaultConfig.responseWithRequestInfo or False
self.defaultConfig.consistency = defaultConfig.consistency
self.graphSetName = self.defaultConfig.defaultGraph
self.count = 0
if not self.defaultConfig.uqlLoggerConfig and self.defaultConfig.Debug:
self.defaultConfig.uqlLoggerConfig = LoggerConfig(name="ultipa", fileName="",
isStream=self.defaultConfig.Debug, isWriteToFile=False,
level=logging.INFO)
[docs]
def getGraphSetName(self, currentGraphName: str, uql: str = "", isGlobal: bool = False):
if isGlobal:
return RAFT_GLOBAL
if uql:
parse = UQLHelper(uql)
if parse.uqlIsGlobal():
return RAFT_GLOBAL
c1 = parse.parseRet.getFirstCommands()
c2 = f"{c1}().{parse.parseRet.getSecondCommands()}"
if c2 in [CommandList.mount, CommandList.unmount, CommandList.truncate]:
graphName = parse.parseRet.getCommandsParam(1)
if graphName:
return graphName
return currentGraphName or self.defaultConfig.defaultGraph
[docs]
def getTimeout(self, timeout: int):
return timeout or self.defaultConfig.timeoutWithSeconds
[docs]
def getClientInfo(self, clientType: int = ClientType.Default, graphSetName: str = '', uql: str = '',
isGlobal: bool = False, ignoreRaft: bool = False, useHost: str = None, useMaster: bool = False,
timezone=None, timeZoneOffset=None):
goGraphName = self.getGraphSetName(currentGraphName=graphSetName, uql=uql, isGlobal=isGlobal)
if not ignoreRaft and not self.hostManagerControl.getHostManger(goGraphName).raftReady:
refreshRet = self.refreshRaftLeader(self.hostManagerControl.initHost,
RequestConfig(graphName=goGraphName))
self.hostManagerControl.getHostManger(goGraphName).raftReady = refreshRet
clientInfo = self.hostManagerControl.chooseClientInfo(type=clientType, uql=uql, graphSetName=goGraphName,
useHost=useHost, useMaster=useMaster)
metadata = clientInfo.getMetadata(goGraphName, timezone, timeZoneOffset)
return ClientInfo(Rpcsclient=clientInfo.Rpcsclient, Controlsclient=clientInfo.Controlsclient, metadata=metadata,
graphSetName=goGraphName, host=clientInfo.host)
[docs]
def getRaftLeader(self, requestConfig: RequestConfig = RequestConfig()):
resRaftLeader = self.__autoGetRaftLeader(host=self.host, requestConfig=requestConfig)
RaftStatus = FormatType.getRaftStatus(resRaftLeader)
return ULTIPA_RESPONSE.Response(RaftStatus)
def __getRaftLeader(self, requestConfig: RequestConfig = RequestConfig()):
if requestConfig == None:
graphSetName = None
else:
if not requestConfig.graphName:
graphSetName = 'default'
else:
graphSetName = requestConfig.graphName
clientInfo = self.getClientInfo(clientType=ClientType.Leader, graphSetName=graphSetName, ignoreRaft=True,
useMaster=requestConfig.useMaster)
res = clientInfo.Controlsclient.GetLeader(ultipa_pb2.GetLeaderRequest(), metadata=clientInfo.metadata,
timeout=GETLEADER_TIMEOUT)
return FormatType.Response(_res=res, host=self.host)
def __autoGetRaftLeader(self, host: str, requestConfig: RequestConfig, retry=0):
'''For internal use, return customized value'''
conn = ConnectionBase(host=host, crtFilePath=self.crtPath, defaultConfig=self.defaultConfig)
try:
res = conn.__getRaftLeader(requestConfig)
except Exception as e:
self.hostManagerControl.initHost = conn.host
if host in self.defaultConfig.hosts:
self.defaultConfig.hosts.remove(host)
return {
"code": ULTIPA.Code.FAILED,
"message": str(e._state.code) + ' : ' + str(e._state.details)
}
status = res.status
if status.code == ULTIPA.Code.SUCCESS:
# status.clusterInfo.raftPeers.remove(host) # remove leader
self.hostManagerControl.initHost = host
for i in status.clusterInfo.raftPeers:
if i.host == host:
status.clusterInfo.raftPeers.remove(i)
return {
"code": status.code,
"message": status.message,
'leaderHost': host,
"followersPeerInfos": list(filter(lambda x: x != host, status.clusterInfo.raftPeers)),
"leaderInfos": status.clusterInfo.leader,
}
elif status.code == ULTIPA.Code.NOT_RAFT_MODE:
return {
"code": status.code,
"message": status.message,
"leaderHost": host,
"followersPeerInfos": [],
"leaderInfos": status.clusterInfo.leader
}
elif status.code in [ULTIPA.Code.RAFT_REDIRECT, ULTIPA.Code.RAFT_LEADER_NOT_YET_ELECTED,
ULTIPA.Code.RAFT_NO_AVAILABLE_FOLLOWERS, ULTIPA.Code.RAFT_NO_AVAILABLE_ALGO_SERVERS]:
if retry > 2:
return {
"code": status.code,
"message": status.message,
"redirectHost": res.status.clusterInfo.redirect
}
if status.code != ULTIPA.Code.RAFT_REDIRECT:
time.sleep(0.3)
if status.code == ULTIPA.Code.RAFT_REDIRECT:
host = res.status.clusterInfo.redirect
return self.__autoGetRaftLeader(host=host, requestConfig=requestConfig, retry=retry + 1)
return {
"code": status.code,
"message": status.message
}
[docs]
def refreshRaftLeader(self, redirectHost: str, requestConfig: RequestConfig):
# hosts = [redirectHost] if redirectHost else self.hostManagerControl.getAllHosts()
hosts = [redirectHost] if redirectHost else []
goGraphName = self.getGraphSetName(requestConfig.graphName)
for h in self.defaultConfig.hosts:
if h not in hosts:
hosts.append(h)
# print("host:", hosts)
for host in hosts:
resRaftLeader = self.__autoGetRaftLeader(host=host, requestConfig=requestConfig)
code = resRaftLeader["code"]
if code == ULTIPA.Code.SUCCESS:
leaderHost = resRaftLeader["leaderHost"]
followersPeerInfos = resRaftLeader["followersPeerInfos"]
leaderInfos = resRaftLeader["leaderInfos"]
hostManager = self.hostManagerControl.upsetHostManger(goGraphName, leaderHost)
hostManager.setClients(leaderHost=leaderHost, followersPeerInfos=followersPeerInfos,
leaderInfos=leaderInfos)
return True
# elif code == ULTIPA.Code.RAFT_REDIRECT:
# return False
return False
[docs]
def stopConnectionAlive(self):
if self.runSchedule != None:
self.runSchedule.set()
[docs]
def keepConnectionAlive(self, timeIntervalSeconds: int = None):
timeIntervalSeconds = self.defaultConfig.heartBeat if timeIntervalSeconds == None else timeIntervalSeconds
def test_allconn():
goGraphName = self.defaultConfig.defaultGraph
for host in self.hostManagerControl.getAllClientInfos(goGraphName):
res = host.Controlsclient.SayHello(ultipa_pb2.HelloUltipaRequest(name="test"),
metadata=host.getMetadata(goGraphName, None, None))
# print(host.host,res.message)
if self.defaultConfig.uqlLoggerConfig is None:
self.defaultConfig.uqlLoggerConfig = LoggerConfig(name="HeartBeat", fileName=None,
isWriteToFile=False, level=logging.WARN,
isStream=True)
self.defaultConfig.uqlLoggerConfig.getlogger().info(f"HeartBeat:{host.host}--{res.message}")
schedule.every().second.do(test_allconn)
self.runSchedule = run_continuously(timeIntervalSeconds)