Source code for ultipa.operate.base_extra

# -*- coding: utf-8 -*-
# @Time    : 2023/7/18 17:17
# @Author  : Ultipa
# @Email   : [email protected]
# @File    : base_extra.py
import copy
import csv
import json
import types
from typing import List

from ultipa import ParameterException
from ultipa.configuration.RequestConfig import RequestConfig
from ultipa.configuration.InsertConfig import InsertConfig
from ultipa.connection.clientType import ClientType
from ultipa.connection.commonUql import GetPropertyBySchema
from ultipa.connection.connectionBase import ConnectionBase
from ultipa.connection.uqlHelper import UQLHelper
from ultipa.proto import ultipa_pb2
from ultipa.structs import DBType
from ultipa.structs.Schema import Schema
from ultipa.types import ULTIPA, ULTIPA_REQUEST, ULTIPA_RESPONSE
from ultipa.types.types_response import PropertyTable
from ultipa.utils import UQLMAKER, CommandList
from ultipa.utils.ResposeFormat import ResponseKeyFormat
from ultipa.utils.convert import convertTableToDict, convertToListAnyObject
from ultipa.utils.format import FormatType
from ultipa.utils.raftRetry import RetryHelp
from ultipa.utils.ultipa_datetime import getTimeZoneOffset, getTimeOffsetSeconds


[docs] class BaseExtra(ConnectionBase): ''' Processing class that defines settings for basic operations. '''
[docs] def test(self, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.Response: ''' Test connection. Args: requestConfig: An object of RequestConfig class Returns: Response ''' testResponse = ULTIPA_RESPONSE.Response() returnReq = ULTIPA.ReturnReq(requestConfig.graphName, "test", None, None, False) try: clientInfo = self.getClientInfo(useHost=requestConfig.useHost, useMaster=requestConfig.useMaster) name = 'Test' res = clientInfo.Controlsclient.SayHello(ultipa_pb2.HelloUltipaRequest(name=name), metadata=clientInfo.metadata) returnReq.host = clientInfo.host if (res.message == name + " Welcome To Ultipa!"): if self.defaultConfig.uqlLoggerConfig: self.defaultConfig.uqlLoggerConfig.getlogger().info(res.message) testResponse.status = ULTIPA.Status(code=res.status.error_code, message=res.status.msg) else: testResponse.status = ULTIPA.Status(code=res.status.error_code, message=res.status.msg) except Exception as e: testResponse = ULTIPA_RESPONSE.Response() try: message = str(e._state.details) except: message = str(e) testResponse.status = ULTIPA.Status(code=ULTIPA.Code.UNKNOW_ERROR, message=message) if self.defaultConfig.responseWithRequestInfo: testResponse.req = returnReq return testResponse
[docs] def stats(self, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseStat: ''' Query for the server statistics. Args: requestConfig: An object of RequestConfig class Returns: ResponseStat ''' uqlMaker = UQLMAKER(command=CommandList.stat, commonParams=requestConfig) ret = self.UqlListSimple(uqlMaker=uqlMaker, isSingleOne=True) if ret.status.code == ULTIPA.Code.SUCCESS: ret.data = ret.data[0] return ret
[docs] def exportData(self, request: ULTIPA_REQUEST.Export, requestConfig: RequestConfig = RequestConfig()): try: req = ultipa_pb2.ExportRequest(db_type=request.type, limit=request.limit, select_properties=request.properties, schema=request.schema) clientInfo = self.getClientInfo(graphSetName=requestConfig.graphName, useMaster=requestConfig.useMaster) res = clientInfo.Controlsclient.Export(req, metadata=clientInfo.metadata) res = FormatType.exportResponse(_res=res, timeZone=requestConfig.timeZone, timeZoneOffset=requestConfig.timeZoneOffset) return res except Exception as e: errorRes = ULTIPA_RESPONSE.Response() try: message = str(e._state.code) + ' : ' + str(e._state.details) except: message = 'UNKNOW ERROR' errorRes.status = ULTIPA.Status(code=ULTIPA.Code.UNKNOW_ERROR, message=message) errorRes.req = ULTIPA.ReturnReq(self.graphSetName, "exportData", requestConfig.useHost if requestConfig.useHost else self.host, requestConfig.retry, False) return errorRes
[docs] def uql(self, uql: str, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse: ''' Execute UQL. Args: uql: A uql statement requestConfig: An object of RequestConfig class Returns: UltipaResponse ''' request = ultipa_pb2.UqlRequest() request.uql = uql request.timeout = self.getTimeout(requestConfig.timeoutWithSeconds) if requestConfig.threadNum is not None: request.thread_num = requestConfig.threadNum ultipaRes = ULTIPA_RESPONSE.UltipaResponse() uqlLoggerConfig = self.defaultConfig.uqlLoggerConfig if requestConfig.graphName == '' and self.defaultConfig.defaultGraph != '': requestConfig.graphName = self.defaultConfig.defaultGraph if self.defaultConfig.consistency != self.hostManagerControl.consistency: self.hostManagerControl.consistency = self.defaultConfig.consistency if self.defaultConfig.maxRecvSize != self.hostManagerControl.maxRecvSize: self.hostManagerControl.maxRecvSize = self.defaultConfig.maxRecvSize onRetry = copy.deepcopy(requestConfig.retry) while onRetry.current < onRetry.max: try: import pytz getTimeZoneOffset(requestConfig, self.defaultConfig) timeZone = requestConfig.timeZone if requestConfig.timeZone else self.defaultConfig.timeZone timeZoneOffset = requestConfig.timeZoneOffset if requestConfig.timeZoneOffset else self.defaultConfig.timeZoneOffset timeZoneOffset = getTimeOffsetSeconds(timeZoneOffset) clientInfo = self.getClientInfo(graphSetName=requestConfig.graphName, uql=uql, useHost=requestConfig.useHost, useMaster=requestConfig.useMaster, timezone=timeZone, timeZoneOffset=timeZoneOffset) if uqlLoggerConfig: uqlLoggerConfig.getlogger().info( f'Begin UQL: {uql} graphSetName: {clientInfo.graphSetName} Host: {clientInfo.host}') uqlIsExtra = UQLHelper.uqlIsExtra(uql) if uqlIsExtra: res = clientInfo.Controlsclient.UqlEx(request, metadata=clientInfo.metadata) else: res = clientInfo.Rpcsclient.Uql(request, metadata=clientInfo.metadata) if not requestConfig.stream: ultipaRes = FormatType.uqlMergeResponse(res, timeZone, timeZoneOffset) else: ultipaRes = FormatType.uqlResponse(res, timeZone, timeZoneOffset) if self.defaultConfig.responseWithRequestInfo and not requestConfig.stream: ultipaRes.req = ULTIPA.ReturnReq(clientInfo.graphSetName, uql, clientInfo.host, onRetry, uqlIsExtra) if not isinstance(ultipaRes, types.GeneratorType) and RetryHelp.checkRes(ultipaRes): onRetry.current += 1 continue else: return ultipaRes except Exception as e: onRetry.current += 1 if uqlLoggerConfig: uqlLoggerConfig.getlogger().info( f'Begin Retry [{onRetry.current}]- clientInfo host: {clientInfo.host} graphSetName: {clientInfo.graphSetName}') self.hostManagerControl.getHostManger(requestConfig.graphName).raftReady = False try: message = str(e._state.code) + ' : ' + str(e._state.details) except: message = str(e) ultipaRes.status = ULTIPA.Status(code=ULTIPA.Code.UNKNOW_ERROR, message=message) ultipaRes.req = ULTIPA.ReturnReq(requestConfig.graphName, uql, requestConfig.useHost if requestConfig.useHost else self.host, onRetry, False) return ultipaRes
[docs] def uqlSingle(self, uqlMaker: UQLMAKER) -> ULTIPA_RESPONSE.UltipaResponse: res = self.uql(uqlMaker.toString(), uqlMaker.commonParams) return res
[docs] def UqlListSimple(self, uqlMaker: UQLMAKER, responseKeyFormat: ResponseKeyFormat = None, isSingleOne: bool = True) -> ULTIPA_RESPONSE.UltipaResponse: res = self.uqlSingle(uqlMaker) if res.status.code != ULTIPA.Code.SUCCESS: simplrRes = ULTIPA_RESPONSE.Response(res.status, res.items) return simplrRes if not isSingleOne: retList = [] for alias in res.aliases: item = res.items.get(alias.alias) table = item.data table_rows = table.rows table_rows_dict = convertTableToDict(table_rows, table.headers) if responseKeyFormat: table_rows_dict = responseKeyFormat.changeKeyValue(table_rows_dict) data = convertToListAnyObject(table_rows_dict) retList.append(PropertyTable(name=table.name, data=data)) simplrRes = ULTIPA_RESPONSE.Response(res.status, retList) simplrRes.req = res.req return simplrRes alisFirst = res.aliases[0].alias if len(res.aliases) > 0 else None firstItem = res.items.get(alisFirst) if firstItem: table_rows = firstItem.data.rows table_rows_dict = convertTableToDict(table_rows, firstItem.data.headers) if responseKeyFormat: table_rows_dict = responseKeyFormat.changeKeyValue(table_rows_dict) data = convertToListAnyObject(table_rows_dict) simplrRes = ULTIPA_RESPONSE.Response(res.status, data) simplrRes.req = res.req simplrRes.statistics = res.statistics return simplrRes else: return res
[docs] def UqlUpdateSimple(self, uqlMaker: UQLMAKER): res = self.uqlSingle(uqlMaker) if res.status.code != ULTIPA.Code.SUCCESS: return ULTIPA_RESPONSE.Response(res.status, statistics=res.statistics) if res.req: ret = ULTIPA_RESPONSE.Response(res.status, statistics=res.statistics) ret.req = res.req return ret return ULTIPA_RESPONSE.Response(res.status, statistics=res.statistics)
[docs] def insertNodesBatchBySchema(self, schema: Schema, rows: List[ULTIPA.EntityRow], config: InsertConfig) -> ULTIPA_RESPONSE.InsertResponse: ''' Batch insert nodes of a same schema (that already exists in the graphset). Args: schema: An object of Schema class rows: The data rows to be inserted, List[ULTIPA.EntityRow] config: An object of InsertConfig class Returns: InsertResponse ''' config.useMaster = True if config.graphName == '' and self.defaultConfig.defaultGraph != '': config.graphName = self.defaultConfig.defaultGraph clientInfo = self.getClientInfo(clientType=ClientType.Update, graphSetName=config.graphName, useMaster=config.useMaster) nodetable = FormatType.makeEntityNodeTable(schema, rows, getTimeZoneOffset(requestConfig=config, defaultConfig=self.defaultConfig)) _nodeTable = ultipa_pb2.EntityTable(schemas=nodetable.schemas, entity_rows=nodetable.nodeRows) request = ultipa_pb2.InsertNodesRequest() request.silent = config.silent request.insert_type = config.insertType request.graph_name = config.graphName request.node_table.MergeFrom(_nodeTable) res = clientInfo.Rpcsclient.InsertNodes(request, metadata=clientInfo.metadata) status = FormatType.status(res.status) uqlres = ULTIPA_RESPONSE.Response(status=status) reTry = RetryHelp.check(self, config, uqlres) if reTry.canRetry: config.retry = reTry.nextRetry return self.insertNodesBatchBySchema(schema, rows, config) uRes = ULTIPA_RESPONSE.ResponseBulk() uRes.uuids = [i for i in res.uuids] errorDict = {} for i, data in enumerate(res.ignore_error_code): try: index = rows[res.ignore_indexes[i]]._getIndex() except Exception as e: try: index = res.ignore_indexes[i] except Exception as e: index = i if index is None: try: index = res.ignore_indexes[i] except Exception as e: index = i errorDict.update({index: data}) uRes.errorItem = errorDict uqlres.data = uRes uqlres.total_time = res.time_cost uqlres.engine_time = res.engine_time_cost if self.defaultConfig.responseWithRequestInfo: uqlres.req = ULTIPA.ReturnReq(config.graphName, "InsertNodesBatchBySchema", clientInfo.host, reTry, False) return uqlres
[docs] def insertEdgesBatchBySchema(self, schema: ULTIPA_REQUEST.Schema, rows: List[ULTIPA.EntityRow], config: InsertConfig) -> ULTIPA_RESPONSE.InsertResponse: ''' Batch insert edges of a same schema (that already exists in the graphset) Args: schema: Schema 实例化Schema对象 rows: The data rows to be inserted, List[ULTIPA.EntityRow] config: An object of InsertConfig class Returns: InsertResponse ''' config.useMaster = True if config.graphName == '' and self.defaultConfig.defaultGraph != '': config.graphName = self.defaultConfig.defaultGraph clientInfo = self.getClientInfo(clientType=ClientType.Update, graphSetName=config.graphName, useMaster=config.useMaster) edgetable = FormatType.makeEntityEdgeTable(schema=schema, rows=rows, timeZoneOffset=getTimeZoneOffset(requestConfig=config, defaultConfig=self.defaultConfig)) _edgeTable = ultipa_pb2.EntityTable(schemas=edgetable.schemas, entity_rows=edgetable.edgeRows) request = ultipa_pb2.InsertEdgesRequest() request.silent = config.silent request.insert_type = config.insertType request.graph_name = config.graphName request.create_node_if_not_exist = config.createNodeIfNotExist request.edge_table.MergeFrom(_edgeTable) res = clientInfo.Rpcsclient.InsertEdges(request, metadata=clientInfo.metadata) status = FormatType.status(res.status) uqlres = ULTIPA_RESPONSE.Response(status=status) reTry = RetryHelp.check(self, config, uqlres) if reTry.canRetry: config.retry = reTry.nextRetry return self.insertEdgesBatchBySchema(schema, rows, config) uRes = ULTIPA_RESPONSE.ResponseBulk() uRes.uuids = [i for i in res.uuids] errorDict = {} for i, data in enumerate(res.ignore_error_code): try: index = rows[res.ignore_indexes[i]]._getIndex() except Exception as e: try: index = res.ignore_indexes[i] except Exception as e: index = i if index is None: try: index = res.ignore_indexes[i] except Exception as e: index = i errorDict.update({index: data}) uRes.errorItem = errorDict uqlres.data = uRes uqlres.total_time = res.time_cost uqlres.engine_time = res.engine_time_cost if self.defaultConfig.responseWithRequestInfo: uqlres.req = ULTIPA.ReturnReq(config.graphName, "InsertEdgesBatchBySchema", clientInfo.host, reTry, False) return uqlres
[docs] def insertNodesBatchAuto(self, nodes: List[ULTIPA.EntityRow], config: InsertConfig) -> ULTIPA_RESPONSE.ResponseBatchAutoInsert: ''' Batch insert nodes of different schemas (that will be created if not existent) Args: nodes: The data rows to be inserted, List[ULTIPA.EntityRow] config: An object of InsertConfig class Returns: ResponseBatchAutoInsert ''' Result = {} schemaDict = {} batches = {} schemaRet = self.uql(GetPropertyBySchema.node, config) if schemaRet.status.code == ULTIPA.Code.SUCCESS: for aliase in schemaRet.aliases: if aliase.alias == '_nodeSchema': schemaDict = convertTableToDict(schemaRet.alias(aliase.alias).data.rows, schemaRet.alias(aliase.alias).data.headers) if not schemaDict: raise ParameterException(err='Please create Node Schema.') else: raise ParameterException(err=schemaRet.status.message) for index, node in enumerate(nodes): node._index = index if batches.get(node.schema) is None: batches[node.schema] = ULTIPA_REQUEST.Batch() find = list(filter(lambda x: x.get('name') == node.schema, schemaDict)) if find: findSchema = find[0] propertyList = FormatType.checkProperty(node, json.loads(findSchema.get("properties"))) reqSchema = ULTIPA_REQUEST.Schema(name=node.schema, properties=propertyList,dbType=DBType.DBNODE) batches[node.schema].Schema = reqSchema else: if node.schema is None: raise ParameterException(err=f"Row [{index}]:Please set schema name for node.") else: raise ParameterException(err=f"Row [{index}]:Node Schema not found {node.schema}.") batches.get(node.schema).Nodes.append(node) for key in batches: batch = batches.get(key) Result.update( {key: self.insertNodesBatchBySchema(schema=batch.Schema, rows=batch.Nodes, config=config)}) newStatusMsg = "" newCode = None for i, key in enumerate(Result): ret = Result.get(key) newStatusMsg += f"{key}:{ret.status.message} " if ret.status.code != ULTIPA.Code.SUCCESS and not newCode: newCode = ret.status.code if newCode is None: newCode = ULTIPA.Code.SUCCESS status = ULTIPA_RESPONSE.Status(newCode, newStatusMsg) newResponse = ULTIPA_RESPONSE.ResponseBatchAutoInsert(status=status) newResponse.data = Result return newResponse
[docs] def insertEdgesBatchAuto(self, edges: List[ULTIPA.EntityRow], config: InsertConfig) -> ULTIPA_RESPONSE.ResponseBatchAutoInsert: ''' Batch insert edges of different schemas (that will be created if not existent) Args: edges: The data rows to be inserted, List[ULTIPA.EntityRow] config: An object of InsertConfig class Returns: ResponseBatchAutoInsert ''' Result = {} schemaDict = [] batches = {} schemaRet = self.uql(GetPropertyBySchema.edge, config) if schemaRet.status.code == ULTIPA.Code.SUCCESS: for aliase in schemaRet.aliases: if aliase.alias == '_edgeSchema': schemaDict = convertTableToDict(schemaRet.alias(aliase.alias).data.rows, schemaRet.alias(aliase.alias).data.headers) if not schemaDict: raise ParameterException(err='Please create Edge Schema.') else: raise ParameterException(err=schemaRet.status.message) for index, edge in enumerate(edges): edge._index = index if batches.get(edge.schema) == None: batches[edge.schema] = ULTIPA_REQUEST.Batch() find = list(filter(lambda x: x.get('name') == edge.schema, schemaDict)) if find: findSchema = find[0] propertyList = FormatType.checkProperty(edge, json.loads(findSchema.get("properties"))) reqSchema = ULTIPA_REQUEST.Schema(name=edge.schema, properties=propertyList,dbType=DBType.DBEDGE) batches[edge.schema].Schema = reqSchema else: if edge.schema is None: raise ParameterException(err=f"Row [{index}]:Please set schema name for edge.") else: raise ParameterException(err=f"Row [{index}]:Edge Schema not found {edge.schema}.") batches.get(edge.schema).Edges.append(edge) for key in batches: batch = batches.get(key) Result.update( {key: self.insertEdgesBatchBySchema(schema=batch.Schema, rows=batch.Edges, config=config)}) newStatusMsg = "" newCode = None for i, key in enumerate(Result): ret = Result.get(key) newStatusMsg += f"{key}:{ret.status.message} " if ret.status.code != ULTIPA.Code.SUCCESS and not newCode: newCode = ret.status.code if newCode is None: newCode = ULTIPA.Code.SUCCESS status = ULTIPA_RESPONSE.Status(newCode, newStatusMsg) newResponse = ULTIPA_RESPONSE.ResponseBatchAutoInsert(status=status) newResponse.data = Result return newResponse
def _InsertByCSV(self, csvPath: str, type: DBType, config: InsertConfig, schemaName: str = None) -> ULTIPA_RESPONSE.ResponseBatchAutoInsert: rows = [] propertyType = [] properties = [] types = [] with open(csvPath, "r", encoding="utf-8-sig") as csvfile: reader = csv.reader(csvfile) for i, line in enumerate(reader): if i == 0: for i, property in enumerate(line): k1, k2 = property.split(":") propertyType.append({k1: k2}) types.append({"index": i, "type": k2}) properties.append(k1) continue for i in types: if len(line) == 0: continue if i.get("type") in ["int", "int32", "int64","uint32","uint64"]: try: if line[i.get("index")] == "": line[i.get("index")] = 0 continue line[i.get("index")] = int(line[i.get("index")]) except Exception as e: print(e) if i.get("type") in ["float", "double"]: if line[i.get("index")] == "": line[i.get("index")] = 0.0 continue line[i.get("index")] = float(line[i.get("index")]) line = dict(zip(properties, line)) if i == 0: print(line.keys()) if type == DBType.DBNODE: if line.get("_uuid"): uuid = line.get("_uuid") line.__delitem__("_uuid") if line.get("_id"): line.__delitem__("_id") rows.append(ULTIPA.Node(line, schema_name=schemaName, uuid=int(uuid))) elif line.get("_id"): id = line.get("_id") line.__delitem__("_id") rows.append(ULTIPA.Node(line, schema_name=schemaName, id=id)) else: rows.append(ULTIPA.Node(line, schema_name=schemaName)) elif type == DBType.DBEDGE: if line.get("_from_uuid") and line.get("_to_uuid"): from_uuid = line.get("_from_uuid") line.__delitem__("_from_uuid") to_uuid = line.get("_to_uuid") line.__delitem__("_to_uuid") if line.get("_id"): line.__delitem__("_id") if line.get("_uuid"): line.__delitem__("_uuid") if line.get("_from"): line.__delitem__("_from") if line.get("_to"): line.__delitem__("_to") rows.append( ULTIPA.Edge(line, schema_name=schemaName, from_uuid=int(from_uuid), to_uuid=int(to_uuid))) elif line.get("_from") and line.get("_to"): from_id = line.get("_from") line.__delitem__("_from") to_id = line.get("_to") line.__delitem__("_to") if line.get("_id"): line.__delitem__("_id") if line.get("_uuid"): line.__delitem__("_uuid") if line.get("_from_uuid"): line.__delitem__("_from_uuid") if line.get("_to_uuid"): line.__delitem__("_to_uuid") rows.append(ULTIPA.Edge(line, schema_name=schemaName, from_id=from_id, to_id=to_id)) if type == DBType.DBNODE: return self.insertNodesBatchAuto(rows, config) else: return self.insertEdgesBatchAuto(rows, config)