Source code for ultipa.operate.task_extra

from ultipa.configuration.RequestConfig import RequestConfig
from ultipa.operate.base_extra import BaseExtra
from ultipa.types import ULTIPA_REQUEST, ULTIPA, ULTIPA_RESPONSE
from ultipa.utils import UQLMAKER, CommandList
import json

from ultipa.utils.convert import convertToAnyObject
from ultipa.utils.ResposeFormat import ResponseKeyFormat


[docs] class ALGO_RETURN_TYPE: ALGO_RETURN_REALTIME = 1 ALGO_RETURN_WRITE_BACK = 2 ALGO_RETURN_VISUALIZATION = 4
[docs] class TaskExtra(BaseExtra): ''' Processing class that defines settings for task and process related operations. '''
[docs] def top(self, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseListTop: ''' Top real-time processes. Args: requestConfig: An object of RequestConfig class Returns: ResponseListTop ''' uqlMaker = UQLMAKER(command=CommandList.top, commonParams=requestConfig) res = self.UqlListSimple(uqlMaker) return res
[docs] def kill(self, id: str = None, all: bool = False, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseCommon: ''' Kill real-time processes. Args: id: The ID of real-time process all: Whether to kill all real-time processes requestConfig: An object of RequestConfig class Returns: ResponseCommon ''' commonP = [] if id: commonP = id if all: commonP = '*' uqlMaker = UQLMAKER(command=CommandList.kill, commonParams=requestConfig) uqlMaker.setCommandParams(commonP) res = self.uqlSingle(uqlMaker) return res
[docs] def showTask(self, request: ULTIPA_REQUEST.ShowTask, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseListTask: ''' Show back-end tasks. Args: request: An object of ShowTask class requestConfig: An object of RequestConfig class Returns: ResponseListTask ''' _jsonKeys = ['taskJson'] uqlMaker = UQLMAKER(command=CommandList.showTask, commonParams=requestConfig) commonP = [] if request.name and request.status: commonP.append(request.name) commonP.append(request.status) if request.name and not request.status: commonP.append(request.name) commonP.append('*') if not request.name and request.status: commonP.append('*') commonP.append(request.status) if request.id: commonP = request.id uqlMaker.setCommandParams(commandP=commonP) res = self.UqlListSimple(uqlMaker=uqlMaker, responseKeyFormat=ResponseKeyFormat(jsonKeys=_jsonKeys)) newDatas = [] if res.data: for obj in res.data: obj = obj.__dict__ newData = ULTIPA_RESPONSE.Task() taskJson = obj.get("taskJson", {}) newData.param = json.loads(taskJson.get("param", "{}")) newData.result = taskJson.get("result") task_info = taskJson.get("task_info", {}) if task_info.get('status_code'): task_info["status_code"] = ULTIPA.TaskStatusString[task_info.get("TASK_STATUS")] if task_info.get('engine_cost'): task_info["engine_cost"] = task_info.get("writing_start_time", 0) - task_info.get("start_time", 0) newData.task_info = convertToAnyObject(task_info) return_type_get = int(task_info.get('return_type', 0)) return_type = ULTIPA_RESPONSE.Return_Type() return_type.is_realtime = True if return_type_get & ALGO_RETURN_TYPE.ALGO_RETURN_REALTIME else False return_type.is_visualization = True if return_type_get & ALGO_RETURN_TYPE.ALGO_RETURN_VISUALIZATION else False return_type.is_wirte_back = True if return_type_get & ALGO_RETURN_TYPE.ALGO_RETURN_WRITE_BACK else False newData.task_info.__setattr__('return_type', return_type) newDatas.append(newData) res.data = newDatas return res
[docs] def clearTask(self, request: ULTIPA_REQUEST.ClearTask, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseCommon: ''' Clear back-end tasks. Args: request: An object of ClearTask class requestConfig: An object of RequestConfig class Returns: ResponseCommon ''' uqlMaker = UQLMAKER(command=CommandList.clearTask, commonParams=requestConfig) commonP = [] if request.all: commonP.append('*') else: if request.name and request.status: commonP.append(request.name) commonP.append(request.status) if request.name and not request.status: commonP.append(request.name) commonP.append('*') if not request.name and request.status: commonP.append('*') commonP.append(request.status) if request.id: commonP = request.id uqlMaker.setCommandParams(commandP=commonP) res = self.UqlUpdateSimple(uqlMaker) return res
[docs] def stopTask(self, id: str = None, all: bool = False, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseCommon: ''' Stop back-end tasks. Args: id: The ID of back-end task all: Whether to stop all back-end tasks that are computing requestConfig: An object of RequestConfig class Returns: ResponseCommon ''' uqlMaker = UQLMAKER(command=CommandList.stopTask, commonParams=requestConfig) commonP = [] if all: commonP = '*' if id: commonP = id uqlMaker.setCommandParams(commandP=commonP) return self.UqlUpdateSimple(uqlMaker)
[docs] def clusterInfo(self, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ClusterInfo: ''' Show cluster information. Args: requestConfig: An object of RequestConfig class Returns: ClusterInfo ''' self.refreshRaftLeader(redirectHost='', requestConfig=requestConfig) result = [] if not requestConfig.graphName: graphSetName = 'default' else: graphSetName = requestConfig.graphName for peer in self.hostManagerControl.getAllHostStatusInfo(graphSetName): info = ULTIPA_RESPONSE.Cluster() info.status = peer.status info.host = peer.host info.isLeader = peer.isLeader info.isFollowerReadable = peer.isFollowerReadable info.isAlgoExecutable = peer.isAlgoExecutable info.isUnset = peer.isUnset info.cpuUsage = None info.memUsage = None if peer.status: ret = self.stats(requestConfig=RequestConfig(host=peer.host)) if ret.status.code == ULTIPA.Code.SUCCESS: info.cpuUsage = ret.data.cpuUsage info.memUsage = ret.data.memUsage result.append(info) res = ULTIPA_RESPONSE.Response() res.data = result return res