Source code for ultipa.operate.node_extra
from ultipa.operate.base_extra import BaseExtra
from ultipa.types import ULTIPA_REQUEST, ULTIPA, ULTIPA_RESPONSE
from ultipa.utils import UQLMAKER, CommandList
from ultipa.configuration.RequestConfig import RequestConfig
[docs]
class NodeExtra(BaseExtra):
'''
Processing class that defines settings for node related operations.
'''
[docs]
def searchNode(self, request: ULTIPA_REQUEST.SearchNode,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseSearchNode:
'''
Query for nodes.
Args:
request: An object of SearchNode class
requestConfig: An object of RequestConfig class
Returns:
ResponseSearchNode
'''
uqlMaker = UQLMAKER(command=CommandList.nodes, commonParams=requestConfig)
if request.id:
uqlMaker.setCommandParams(request.id)
elif request.filter:
uqlMaker.setCommandParams(request.filter)
uqlMaker.addParam('as', request.select.aliasName)
uqlMaker.addParam("return", request.select)
res = self.uqlSingle(uqlMaker)
if res.status.code != ULTIPA.Code.SUCCESS:
return res
return res
[docs]
def insertNode(self, request: ULTIPA_REQUEST.InsertNode,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseInsert:
'''
Insert nodes.
Args:
request: An object of InsertNode class
requestConfig: An object of RequestConfig class
Returns:
UltipaResponse
'''
uqlMaker = UQLMAKER(command=CommandList.insert, commonParams=requestConfig)
if request.upsert:
uqlMaker = UQLMAKER(command=CommandList.upsert, commonParams=requestConfig)
if request.overwrite:
uqlMaker.addParam('overwrite', "", required=False)
if request.schemaName:
uqlMaker.addParam('into', request.schemaName, required=False)
uqlMaker.addParam('nodes', request.nodes)
if request.isReturnID:
uqlMaker.addParam('as', "nodes")
uqlMaker.addParam('return', "nodes._uuid")
res = self.uqlSingle(uqlMaker)
if res.status.code != ULTIPA.Code.SUCCESS:
return res
if request.isReturnID:
if len(res.aliases) > 0:
res.data = res.items.get(res.aliases[0].alias).data
return res
[docs]
def updateNode(self, request: ULTIPA_REQUEST.UpdateNode,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse:
'''
Update nodes.
Args:
request: An object of UpdateNode class
requestConfig: An object of RequestConfig class
Returns:
UltipaResponse
'''
uqlMaker = UQLMAKER(command=CommandList.updateNodes, commonParams=requestConfig)
if request.id:
uqlMaker.setCommandParams(request.id)
elif request.filter:
uqlMaker.setCommandParams(request.filter)
uqlMaker.addParam("set", request.values)
uqlMaker.addParam("silent", request.silent)
res = self.uqlSingle(uqlMaker)
return res
[docs]
def deleteNode(self, request: ULTIPA_REQUEST.DeleteNode,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse:
'''
Delete nodes.
Args:
request: An object of DeleteNode class
requestConfig: An object of RequestConfig class
Returns:
UltipaResponse
'''
uqlMaker = UQLMAKER(command=CommandList.deleteNodes, commonParams=requestConfig)
if request.id:
uqlMaker.setCommandParams(request.id)
elif request.filter:
uqlMaker.setCommandParams(request.filter)
res = self.uqlSingle(uqlMaker)
return res