Source code for ultipa.operate.subGraph_extra
# -*- coding: utf-8 -*-
# @Time : 2023/8/26 12:05
# @Author : Ultipa
# @Email : [email protected]
# @File : subGraph_extra.py
import time
from ultipa.configuration.InsertConfig import InsertConfig
from ultipa.configuration.RequestConfig import RequestConfig
from ultipa.operate.graph_extra import GraphExtra
from ultipa.operate.property_extra import PropertyExtra
from ultipa.operate.schema_extra import SchemaExtra
from ultipa.structs import Graph, DBType, EntityRow, InsertType
from ultipa.types import ULTIPA_RESPONSE
[docs]
class SubGraphExtra(GraphExtra,SchemaExtra,PropertyExtra):
'''
Processing class that defines settings for subgraph creating operation.
'''
[docs]
def uqlCreateSubgraph(self, uql: str, subGraphName: str, requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse:
'''
Create subgraph.
Args:
uql: A uql statement that returns paths.
subGraphName: Tje name of subgraph, will be auto-created if not existent
requestConfig: An object of RequestConfig class
Returns:
'''
ret = self.uql(uql, requestConfig)
if ret.status.code != 0:
return ret
graphRet = self.getGraph(subGraphName)
if graphRet.status.code != 0:
self.createGraph(Graph(graph=subGraphName))
time.sleep(3)
dataItems = ret.get(0).asPaths()
nodeSchemasDict={}
edgeSchemasDict={}
schemaNodes = {}
schemaEdges = {}
for data in dataItems:
for nodeSchmemaKey in data.nodeSchemas.keys():
if nodeSchmemaKey in nodeSchemasDict:
continue
dbType = DBType.DBNODE
nodeSchema = data.nodeSchemas.get(nodeSchmemaKey)
nodeSchema.DBType = dbType
if self.showSchema(dbType,nodeSchmemaKey,requestConfig=RequestConfig(graphName=subGraphName)).status.code==0:
for property in nodeSchema.properties:
property.type = property.getPropertyTypeByString(property.type)
subty = []
for i in property.subTypes:
subty.append(property.getPropertyTypeByString(i))
property.subTypes = subty
nodeSchemasDict.update({nodeSchmemaKey: nodeSchema})
continue
self.createSchema(nodeSchema,requestConfig=RequestConfig(graphName=subGraphName))
for property in nodeSchema.properties:
self.createProperty(dbType,nodeSchmemaKey,property,requestConfig=RequestConfig(graphName=subGraphName))
for property in nodeSchema.properties:
property.type = property.getPropertyTypeByString(property.type)
subty = []
for i in property.subTypes:
subty.append(property.getPropertyTypeByString(i))
property.subTypes = subty
nodeSchemasDict.update({nodeSchmemaKey:nodeSchema})
for edgeSchmemaKey in data.edgeSchemas.keys():
if edgeSchmemaKey in edgeSchemasDict:
continue
dbType = DBType.DBEDGE
edgeSchema = data.edgeSchemas.get(edgeSchmemaKey)
edgeSchema.DBType = dbType
if self.showSchema(dbType,edgeSchmemaKey,requestConfig=RequestConfig(graphName=subGraphName)).status.code==0:
for property in edgeSchema.properties:
property.type = property.getPropertyTypeByString(property.type)
subty = []
for i in property.subTypes:
subty.append(property.getPropertyTypeByString(i))
property.subTypes = subty
edgeSchemasDict.update({edgeSchmemaKey: edgeSchema})
continue
self.createSchema(edgeSchema, requestConfig=RequestConfig(graphName=subGraphName))
for property in edgeSchema.properties:
self.createProperty(dbType,edgeSchmemaKey,property,requestConfig=RequestConfig(graphName=subGraphName))
for property in edgeSchema.properties:
property.type = property.getPropertyTypeByString(property.type)
subty = []
for i in property.subTypes:
subty.append(property.getPropertyTypeByString(i))
property.subTypes = subty
edgeSchemasDict.update({edgeSchmemaKey:edgeSchema})
for node in data.nodes:
if schemaNodes.get(node.schema):
schemaNodes.get(node.schema).append(EntityRow(id=node.id,values=node.values))
else:
schemaNodes.update({node.schema:[EntityRow(id=node.id,values=node.values)]})
for edge in data.edges:
if schemaEdges.get(edge.schema):
schemaEdges.get(edge.schema).append(EntityRow(from_id=edge.from_id,to_id=edge.to_id,values=edge.values))
else:
schemaEdges.update({edge.schema:[EntityRow(from_id=edge.from_id,to_id=edge.to_id,values=edge.values)]})
for nodeKey in schemaNodes:
self.insertNodesBatchBySchema(schema=nodeSchemasDict.get(nodeKey),rows=schemaNodes.get(nodeKey),config=InsertConfig(InsertType.UPSERT,graphName = subGraphName))
for edgeKey in schemaEdges:
self.insertEdgesBatchBySchema(schema=edgeSchemasDict.get(edgeKey), rows=schemaEdges.get(edgeKey),
config=InsertConfig(InsertType.UPSERT, graphName=subGraphName))
return ret