Source code for ultipa.operate.download_extra
from typing import List
from ultipa.operate.base_extra import BaseExtra
from ultipa.proto import ultipa_pb2
from ultipa.types import ULTIPA_REQUEST, ULTIPA_RESPONSE, ULTIPA
from ultipa.utils.format import FormatType
from ultipa.configuration.RequestConfig import RequestConfig
[docs]
class DownloadExtra(BaseExtra):
'''
Processing class that defines settings for file downloading operation.
'''
def _download(self, request: ULTIPA_REQUEST.Download,
requestConfig: RequestConfig = RequestConfig()) -> List[
ULTIPA_RESPONSE.Response]:
'''
Download files (for internal use).
Args:
request: An object of Download class
requestConfig: An object of RequestConfig class
with open('./XXXXX.csv', 'ab+') as f:
for data_flow in ret:
data = data_flow.chunk
f.write(data)
Returns:
stream
'''
downResponse = ULTIPA_RESPONSE.Response()
try:
clientInfo = self.getClientInfo(graphSetName=requestConfig.graphName, useMaster=requestConfig.useMaster)
res = clientInfo.Controlsclient.DownloadFileV2(
ultipa_pb2.DownloadFileRequestV2(file_name=request.fileName, task_id=request.taskId),
metadata=clientInfo.metadata)
if request.savePath:
with open(request.savePath, 'wb+') as f:
for data_flow in res:
status = FormatType.status(data_flow.status)
downResponse.status = status
if status.code != ULTIPA.Code.SUCCESS:
yield downResponse
break
data = data_flow.chunk
f.write(data)
yield downResponse
else:
for data_flow in res:
ultipa_response = ULTIPA_RESPONSE.Response()
status = FormatType.status(data_flow.status)
ultipa_response.status = status
if status.code != ULTIPA.Code.SUCCESS:
yield ultipa_response
break
ultipa_response.data = data_flow.chunk
yield ultipa_response
except Exception as e:
downResponse.status = ULTIPA.Status(code=ULTIPA.Code.UNKNOW_ERROR, message=str(e._state.details))
yield downResponse
[docs]
def download(self, request: ULTIPA_REQUEST.Download,
requestConfig: RequestConfig = RequestConfig()) -> List[
ULTIPA_RESPONSE.Response]:
'''
Download files.
Args:
request: An object of Download class
requestConfig: An object of RequestConfig class
with open('./XXXXX.csv', 'ab+') as f:
for data_flow in ret:
data = data_flow.chunk
f.write(data)
Returns:
stream
'''
return list(self._download(request,requestConfig))