Source code for ultipa.operate.user_extra
from ultipa.operate.base_extra import BaseExtra
from ultipa.utils import UQLMAKER, CommandList
from ultipa.types import ULTIPA_REQUEST, ULTIPA_RESPONSE
from ultipa.utils.errors import ParameterException
from ultipa.utils.format import FormatType
from ultipa.utils.ResposeFormat import ResponseKeyFormat
from ultipa.configuration.RequestConfig import RequestConfig
JSONSTRING_KEYS = ["graphPrivileges", "systemPrivileges", "policies", "policy", "privilege"]
formatdata = ['graph_privileges']
[docs]
class UserExtra(BaseExtra):
'''
Processing class that defines settings for user related operations.
'''
def _GRPATH_PRIVILEGES_DATA_FORMAT(self, obj):
if isinstance(obj.get('graph_privileges'), list):
resr = FormatType.graphPrivileges(obj.get('graph_privileges'))
return resr
else:
return '[]'
[docs]
def showUser(self,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseListUser:
'''
Show user list.
Args:
requestConfig: An object of RequestConfig class
Returns:
ResponseListUser
'''
uqlMaker = UQLMAKER(command=CommandList.showUser, commonParams=requestConfig)
res = self.UqlListSimple(uqlMaker=uqlMaker,
responseKeyFormat=ResponseKeyFormat(jsonKeys=JSONSTRING_KEYS, dataFormat=formatdata))
return res
[docs]
def getUser(self, username:str,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.ResponseUser:
'''
Get a designated user.
Args:
username: The name of user
requestConfig: An object of RequestConfig class
Returns:
ResponseUser
'''
uqlMaker = UQLMAKER(command=CommandList.getUser, commonParams=requestConfig)
uqlMaker.setCommandParams(username)
res = self.UqlListSimple(uqlMaker=uqlMaker,
responseKeyFormat=ResponseKeyFormat(jsonKeys=JSONSTRING_KEYS, dataFormat=formatdata))
if isinstance(res.data, list) and len(res.data) > 0:
res.data = res.data[0]
return res
[docs]
def getSelfInfo(self,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse:
'''
Get the current user.
Args:
requestConfig: An object of RequestConfig class
Returns:
UltipaResponse
'''
uqlMaker = UQLMAKER(command=CommandList.getSelfInfo, commonParams=requestConfig)
res = self.UqlListSimple(uqlMaker=uqlMaker,
responseKeyFormat=ResponseKeyFormat(jsonKeys=JSONSTRING_KEYS, dataFormat=formatdata))
return res
[docs]
def createUser(self, request: ULTIPA_REQUEST.CreateUser,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse:
'''
Create a user.
Args:
request: An object of CreateUser class
requestConfig: An object of RequestConfig class
Returns:
UltipaResponse
'''
uqlMaker = UQLMAKER(command=CommandList.createUser, commonParams=requestConfig)
params = []
if request.username:
params.append(request.username)
else:
raise ParameterException(err='username is a required parameter')
if request.password:
params.append(request.password)
else:
raise ParameterException(err='password is a required parameter')
if request.graph_privileges:
params.append(request.graph_privileges)
else:
params.append({})
if request.system_privileges:
params.append(request.system_privileges)
else:
params.append([])
if request.policies:
params.append(request.policies)
else:
params.append([])
uqlMaker.setCommandParams(params)
return self.uqlSingle(uqlMaker=uqlMaker)
[docs]
def dropUser(self, username:str,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse:
'''
Drop a user.
Args:
username: The name of user
requestConfig: An object of RequestConfig class
Returns:
UltipaResponse
'''
uqlMaker = UQLMAKER(command=CommandList.dropUser, commonParams=requestConfig)
uqlMaker.setCommandParams(username)
return self.uqlSingle(uqlMaker=uqlMaker)
[docs]
def alterUser(self, request: ULTIPA_REQUEST.AlterUser,
requestConfig: RequestConfig = RequestConfig()) -> ULTIPA_RESPONSE.UltipaResponse:
'''
Alter a user.
Args:
request: An object of AlterUser class
requestConfig: An object of RequestConfig class
Returns:
UltipaResponse
'''
uqlMaker = UQLMAKER(command=CommandList.alterUser, commonParams=requestConfig)
if request.username:
uqlMaker.setCommandParams(request.username)
else:
raise ParameterException(err='username is a required parameter')
paramsDict = {}
if request.password:
paramsDict.setdefault('password', request.password)
if request.graph_privileges:
paramsDict.setdefault('graph_privileges', request.graph_privileges)
if request.system_privileges:
paramsDict.setdefault('system_privileges', request.system_privileges)
if request.policies:
paramsDict.setdefault('policies', request.policies)
uqlMaker.addParam('set', paramsDict)
return self.uqlSingle(uqlMaker=uqlMaker)