Source code for ultipa.operate.algo_extra
import hashlib
import os
from ultipa.configuration.RequestConfig import RequestConfig
from ultipa.operate.base_extra import BaseExtra
from ultipa.proto import ultipa_pb2
from ultipa.structs.Algo import ALGO_RESULT
from ultipa.types import ULTIPA_REQUEST, ULTIPA, ULTIPA_RESPONSE
from ultipa.utils import UQLMAKER, CommandList, errors
from ultipa.utils.fileSize import read_in_chunks
from ultipa.utils.format import FormatType
from ultipa.utils.ResposeFormat import ResponseKeyFormat
[docs]
class AlgoExtra(BaseExtra):
'''
Processing class that defines settings for algorithm related operations.
'''
JSONSTRING_KEYS = ["param"]
[docs]
def showAlgo(self,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseListAlgo:
'''
Query for algorithm list.
Args:
requestConfig: An object of the RequestConfig class
Returns:
ResponseListAlgo
'''
uqlMaker = UQLMAKER(command=CommandList.showAlgo, commonParams=requestConfig)
res = self.UqlListSimple(uqlMaker=uqlMaker, responseKeyFormat=ResponseKeyFormat(jsonKeys=self.JSONSTRING_KEYS))
if res.status.code == ULTIPA.Code.SUCCESS:
for algo in res.data:
try:
result_opt = int(algo.param.get("result_opt", 0))
except Exception as e:
raise errors.ParameterException(e)
result_opt_obj = ULTIPA_RESPONSE.AlgoResultOpt()
result_opt_obj.can_realtime = True if result_opt & ALGO_RESULT.WRITE_TO_CLIENT else False
result_opt_obj.can_visualization = True if result_opt & ALGO_RESULT.WRITE_TO_VISUALIZATION else False
result_opt_obj.can_write_back = True if result_opt & (
ALGO_RESULT.WRITE_TO_DB | ALGO_RESULT.WRITE_TO_FILE) else False
algo.__setattr__("result_opt", result_opt_obj)
return res
def __make_message(self, filename, md5, chunk):
return ultipa_pb2.InstallAlgoRequest(
file_name=filename, md5=md5, chunk=chunk
)
def __generate_messages(self, request: ULTIPA_REQUEST.InstallAlgo):
messages = []
file_object = open(request.soPath, 'rb')
somd5 = hashlib.md5(file_object.read()).hexdigest()
file_object.close()
for chunk in read_in_chunks(request.soPath):
filename = os.path.basename(request.soPath)
messages.append(self.__make_message(filename, somd5, chunk))
file_object = open(request.configPath, 'rb')
configmd5 = hashlib.md5(file_object.read()).hexdigest()
file_object.close()
for chunk in read_in_chunks(request.configPath):
filename = os.path.basename(request.configPath)
messages.append(self.__make_message(filename, configmd5, chunk))
for msg in messages:
yield msg
[docs]
def installAlgo(self, algoFilePath: str, algoInfoFilePath: str,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.Response:
'''
Install an Ultipa standard algorithm.
Args:
algoFilePath: The directory of the algorithm package (.so)
algoInfoFilePath: The directory of the algorithm configuration (.yml)
requestConfig: An object of the RequestConfig class
Returns:
Response
'''
request = ULTIPA_REQUEST.InstallAlgo(algoInfoFilePath, algoFilePath)
requestConfig.useMaster = True
clientInfo = self.getClientInfo(graphSetName=requestConfig.graphName, useMaster=requestConfig.useMaster,
isGlobal=True)
response = ULTIPA_RESPONSE.Response()
try:
if os.path.exists(request.soPath) and os.path.exists(request.configPath):
installRet = clientInfo.Controlsclient.InstallAlgo(self.__generate_messages(request),
metadata=clientInfo.metadata)
status = FormatType.status(installRet.status)
response.status = status
except Exception as e:
try:
message = str(e._state.code) + ' : ' + str(e._state.details)
except:
message = str(e)
response.status = ULTIPA.Status(code=ULTIPA.Code.UNKNOW_ERROR, message=message)
if self.defaultConfig.responseWithRequestInfo:
response.req = ULTIPA.ReturnReq(self.graphSetName, "InstallAlgo",
requestConfig.useHost if requestConfig.useHost else self.host,
requestConfig.retry,
False)
return response
[docs]
def uninstallAlgo(self, algoName: str,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.Response:
'''
Uninstall an Ultipa standard algorithm.
Args:
algoName: The name of algorithm
requestConfig: An object of the RequestConfig class
Returns:
Response
'''
requestConfig.useMaster = True
clientInfo = self.getClientInfo(graphSetName=requestConfig.graphName, useMaster=requestConfig.useMaster,
isGlobal=True)
arequest = ultipa_pb2.UninstallAlgoRequest(algo_name=algoName)
installRet = clientInfo.Controlsclient.UninstallAlgo(arequest, metadata=clientInfo.metadata)
status = FormatType.status(installRet.status)
response = ULTIPA_RESPONSE.Response(status=status)
if self.defaultConfig.responseWithRequestInfo:
response.req = ULTIPA.ReturnReq(self.graphSetName, "InstallAlgo",
requestConfig.useHost if requestConfig.useHost else self.host,
requestConfig.retry,
False)
return response
[docs]
def installExtaAlgo(self, algoFilePath: str, algoInfoFilePath: str,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.Response:
'''
Install an EXTA algorithm (via gRPC port).
Args:
algoFilePath: The directory of the algorithm package (.so)
algoInfoFilePath: The directory of the algorithm configuration (.yml)
requestConfig: An object of the RequestConfig class
Returns:
Response
'''
request = ULTIPA_REQUEST.InstallAlgo(algoInfoFilePath, algoFilePath)
requestConfig.useMaster = True
clientInfo = self.getClientInfo(graphSetName=requestConfig.graphName, useMaster=requestConfig.useMaster)
response = ULTIPA_RESPONSE.Response()
try:
if os.path.exists(request.soPath) and os.path.exists(request.configPath):
installRet = clientInfo.Controlsclient.InstallExta(self.__generate_messages(request),
metadata=clientInfo.metadata)
status = FormatType.status(installRet.status)
response.status = status
except Exception as e:
try:
message = str(e._state.code) + ' : ' + str(e._state.details)
except:
message = str(e)
response.status = ULTIPA.Status(code=ULTIPA.Code.UNKNOW_ERROR, message=message)
if self.defaultConfig.responseWithRequestInfo:
response.req = ULTIPA.ReturnReq(self.graphSetName, "InstallAlgo",
requestConfig.useHost if requestConfig.useHost else self.host,
requestConfig.retry,
False)
return response
[docs]
def uninstallExtaAlgo(self, algoName: str,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.Response:
'''
Uninstall an EXTA (via gRPC port).
Args:
algoName: The name of algorithm
requestConfig: An object of the RequestConfig class
Returns:
Response
'''
requestConfig.useMaster = True
clientInfo = self.getClientInfo(graphSetName=requestConfig.graphName, useMaster=requestConfig.useMaster)
arequest = ultipa_pb2.UninstallExtaRequest(exta_name=algoName)
installRet = clientInfo.Controlsclient.UninstallExta(arequest, metadata=clientInfo.metadata)
status = FormatType.status(installRet.status)
response = ULTIPA_RESPONSE.Response(status=status)
if self.defaultConfig.responseWithRequestInfo:
response.req = ULTIPA.ReturnReq(self.graphSetName, "InstallAlgo",
requestConfig.useHost if requestConfig.useHost else self.host,
requestConfig.retry,
False)
return response