新加的内容
作者:互联网
# Copyright 2019-present Lenovo # Confidential and Proprietary import json from py.path import local from pytest import mark from rest_framework.status import HTTP_200_OK @mark.django_db def test_template_job_for_kube( client, mocker, settings, user ): settings.LICO_OPERATOR = 'kube' from antilles.common.utils import encode_base64url uuid = 'c12c00ec9d4c11e9ac32000c29fde54a' ingress_ctrl_addr = mocker.patch( "antilles.kubernetes.utils.get_user_context", return_value={"ingress_ctrl_addr": "http://127.0.0.1:8000"} ) jobfile_mock = mocker.patch( 'antilles.scheduler_ext.models.Template.generate_jobfile', return_value={ "job_name": "pv2_test1", "jobfilecontent": "123", "job_log_file": "", "job_workspace": str(local(user.workspace).join('test')), "internal_name": uuid } ) mock_create_job = mocker.patch( 'antilles.scheduler.job_proxy.JobManagerProxy.request_create_job', return_value={'id': 102, "ports": {"12": 50000}} ) import base64 password = '123456' mocker.patch( 'antilles.devtools.utils.passwd', return_value=password ) data = { "job_queue": "compute", "job_workspace": 'test', "language": "py2", "password": base64.b64encode("123456"), "job_name": "pv2_test1", "image_path": "/home/hpcadmin/new_image_628/jupyter-cpu.image", "nodes": 1, "conda_env": "test" } response = client.post( '/submit/', data=json.dumps(data), content_type="application/json", ) assert response.status_code == HTTP_200_OK ingress_ctrl_addr.assert_called_once() jobfile_mock.assert_called_once_with( user, dict( data, password=password, job_workspace=str(local(user.workspace).join('test')), conda_env=str(local(user.workspace).join('test')), host='lico.lenovo.com', base_url=encode_base64url('127.0.0.1:8000'), container_port=6006, command='/opt/start' ) ) mock_create_job.assert_called_once_with( workspace=str(local(user.workspace).join('test')), jobname=data['job_name'], jobfilecontent='123', type="jupyter", json_body=mocker.ANY, output_file='', submitter=user ) @mark.django_db def test_jupyter_list_for_kube( client, settings, user, jupyter, mocker ): settings.LICO_OPERATOR = 'kube' ingress_ctrl_addr = mocker.patch( "antilles.kubernetes.utils.get_user_context", return_value={"ingress_ctrl_addr": "http://127.0.0.1:8000"} ) from antilles.devtools.models import Devtools jupyter1 = Devtools.objects.create( user=user, password='123456', port=80001, language='pv2', uuid='88e20b0e9be011e994be000c29fde54a', job=2 ) jupyter2 = Devtools.objects.create( user=user, password='123456', port=80003, language='pv2', uuid='88e20b0e9be011e994be000c29fde54a', job=3 ) mock_job_query = mocker.patch( 'antilles.scheduler.job_proxy.JobObjectsQuery.filter_with_commit', return_value=[ { "status": "complete", "exechosts": "localhost*1", "cpuscount": 1, "jobname": "test", "gpuscount": 2, "starttime": 1561611663, "endtime": 1561611665, "id": jupyter.job, "json_body": "test", "submiter": "test", "priority": "test" }, { "status": "running", "exechosts": "localhost*1", "cpuscount": 1, "jobname": "test", "gpuscount": 2, "starttime": 1561611663, "endtime": 1561611665, "id": jupyter1.job, "json_body": "test", "submiter": "test", "priority": "test" }, { "status": "running", "exechosts": "localhost*1", "cpuscount": 1, "jobname": "test", "gpuscount": 2, "starttime": 1561611663, "endtime": 1561611665, "id": jupyter2.job, "json_body": "test", "submiter": "test", "priority": "test" } ] ) response = client.get( '/' ) import operator expect_list = sorted(['uuid', 'endtime', 'create_time', 'id', 'status', 'exechosts', 'gpuscount', 'jobname', 'language', 'port', 'jupyter_id', 'job', 'cpuscount']) assert response.status_code == HTTP_200_OK assert operator.eq(sorted(list(response.data['data'][0].keys())), expect_list) assert operator.eq(sorted(list(response.data['data'][1].keys())), expect_list) assert operator.eq(sorted(list(response.data['data'][2].keys())), expect_list) assert response.data['data'][0]['status'] == "running" assert response.data['data'][1]['status'] == "running" assert response.data['data'][0]['id'] in [jupyter1.job, jupyter2.job] assert response.data['data'][1]['id'] in [jupyter1.job, jupyter2.job] assert response.data['data'][2]['status'] == "complete" assert response.data['data'][2]['id'] == jupyter.job mock_job_query.assert_called_once() ingress_ctrl_addr.assert_called() @mark.django_db def test_jupyter_rerun_for_kube( client, settings, user, mocker, jupyter ): settings.LICO_OPERATOR = 'kube' uuid = 'c12c00ec9d4c11e9ac32000c29fde54a' jobfile_mock = mocker.patch( 'antilles.scheduler_ext.models.Template.generate_jobfile', return_value={ "internal_name": uuid, "job_workspace": str(local(user.workspace).join('test')), "job_name": "pv2_test1", "jobfilecontent": "123", "job_log_file": "123" } ) mock_create_job = mocker.patch( 'antilles.scheduler.job_proxy.JobManagerProxy.request_create_job', return_value={'id': 100, "ports": {"13": 50000}} ) mock_job_query = mocker.patch( 'antilles.scheduler.job_proxy.JobObjectsQuery.get_with_commit', return_value={ "json_body": json.dumps( { "parameters": { "job_queue": "compute", "job_workspace": str(local(user.workspace). join('test')), "py_version": "py2", "password": "123456", "job_name": "pv2_test1", "image_path": "/home/hpcadmin/jupyter-cpu.image", "nodes": 1, "conda_env": str(local(user.workspace).join('test')), "host": "lico.lenovo.com", "base_url": '127.0.0.1:8000', "container_port": 6006, "command": "/opt/start" } } ) } ) response = client.put( '/1/' ) from antilles.devtools.models import Devtools jupyter1 = Devtools.objects.get(id=1) jobfile_mock.assert_called_once() mock_create_job.assert_called_once() mock_job_query.assert_called_once() assert response.status_code == HTTP_200_OK assert jupyter1.job == 100 import operator assert jupyter1.uuid == operator.concat('u', uuid) jobfile_mock.assert_called_once_with( user, { "job_queue": "compute", "job_workspace": str(local(user.workspace).join('test')), "py_version": "py2", "password": "123456", "job_name": "pv2_test1", "image_path": "/home/hpcadmin/jupyter-cpu.image", "nodes": 1, "conda_env": str(local(user.workspace).join('test')), "host": "lico.lenovo.com", "base_url": '127.0.0.1:8000', "container_port": 6006, "command": "/opt/start" } ) mock_create_job.assert_called_once_with( workspace=str(local(user.workspace).join('test')), jobname="pv2_test1", jobfilecontent='123', type="jupyter", json_body=mocker.ANY, output_file='123', submitter=user ) @mark.django_db def test_jupyter_delete( client, settings, user, mocker, jupyter ): settings.LICO_OPERATOR = 'kube' mock_job_query = mocker.patch( 'antilles.scheduler.job_proxy.JobObjectsQuery.get_with_commit', return_value={ "json_body": json.dumps( { "parameters": { "job_queue": "compute", "job_workspace": 'test', "py_version": "py2", "password": "123456", "job_name": "pv2_test1", "image_path": "/home/hpcadmin/jupyter-cpu.image", "nodes": 1, "conda_env": 'test' } } ) } ) operator = mocker.patch( 'antilles.devtools.devtools.get_file_operator' ).return_value operator.path_exists.return_value = True operator.path_isdir.return_value = True response = client.delete( '/1/', data=json.dumps({"delete_completely": True}), content_type="application/json" ) from antilles.devtools.models import Devtools jupyter1 = Devtools.objects.filter(id=1) mock_job_query.assert_called_once() operator.mkdir.assert_called_once_with( str(local(user.workspace).join('test')) ) operator.rmtree.assert_called_with( str(local(user.workspace).join('test')) ) assert response.status_code == HTTP_200_OK assert jupyter1.exists() is False assert operator.rmtree.call_count == 1 assert operator.mkdir.call_count == 1 assert operator.chown.call_count == 1k8s_ut
# Copyright 2019-present Lenovo # Confidential and Proprietary import json from os import path from django.db import transaction from rest_framework.response import Response from rest_framework.views import APIView from antilles.common.operator import get_operator from antilles.devtools.models import Devtools from antilles.scheduler.job_proxy import JobObjectsQuery class JupyetrSubmit(APIView): def post(self, request): from copy import deepcopy import operator base_parameters = deepcopy(request.data) if operator.truth(base_parameters['job_workspace']): base_parameters['job_workspace'] = path.join( request.user.workspace, request.data['job_workspace'] ) if operator.truth(base_parameters.get('conda_env', '')): base_parameters['conda_env'] = path.join( request.user.workspace, request.data['conda_env'] ) from antilles.devtools.utils import passwd import base64 base_parameters['password'] = passwd( base64.b64decode(base_parameters['password']) ) jupyter_operator = get_operator().jupyter_operator(request.user) ret = jupyter_operator.create_jupyter_job(base_parameters) return Response(ret) class JupyterList(APIView): def get(self, request): import calendar from pandas import DataFrame, merge jupyte_frame = DataFrame( { "language": jupyter.language, "create_time": calendar.timegm(jupyter.create_time.timetuple()), "port": jupyter.port, "id": jupyter.job, "job": jupyter.job, "jupyter_id": jupyter.id, "uuid": jupyter.uuid } for jupyter in Devtools.objects.filter(user=request.user).iterator() ) if len(jupyte_frame) == 0: return Response({"data": []}) job_frame = DataFrame(JobObjectsQuery().filter_with_commit( id__in=jupyte_frame['job'].tolist(), submiter=request.user.username ))[["id", "status", "gpuscount", "endtime", "exechosts", "jobname", "cpuscount"]] res = merge(jupyte_frame, job_frame, on='id') jupyter_operator = get_operator().jupyter_operator(request.user) res['exechosts'] = res.apply( jupyter_operator.queue_host, axis=1 ) res_running = res[res['status'].isin(['running'])] res.drop(index=list(res_running.index), inplace=True) return Response( {"data": res_running.append(res).to_dict(orient='records')} ) class JupyterDetail(APIView): @staticmethod def _image_jupyter(user, pk): return Devtools.objects.filter(user=user).get(job=pk) def put(self, request, pk): with transaction.atomic(): jupyter = self._image_jupyter(request.user, pk) job_info = JobObjectsQuery().get_with_commit(id=pk) parameters = json.loads(job_info['json_body'])['parameters'] jupyter_operator = get_operator().jupyter_operator(request.user) ret = jupyter_operator.rerun_jupyter(parameters, jupyter) print ret return Response(ret) def delete(self, request, pk): confirm = request.data.get('delete_completely', False) jupyter = self._image_jupyter(request.user, pk) if confirm: job_info = JobObjectsQuery().get_with_commit(id=pk) parameters = json.loads(job_info['json_body'])['parameters'] jupyter_operator = get_operator().jupyter_operator(request.user) jupyter_operator.delete_jupyter( parameters, jupyter ) return Response()views.py
# Copyright 2019-present Lenovo # Confidential and Proprietary import json import operator from os import path from six import raise_from from antilles.common.operator import get_file_operator from antilles.devtools.models import Devtools from antilles.scheduler.job_proxy import JobManagerProxy from antilles.scheduler_ext.exceptions import JobTemplateNotExist from antilles.scheduler_ext.models import Template class BaseDevtools(object): @staticmethod def request_create_job(user, base_parameters, processed_param): ret = JobManagerProxy.request_create_job( workspace=processed_param["job_workspace"], jobname=processed_param['job_name'], jobfilecontent=processed_param['jobfilecontent'], type="jupyter", json_body=json.dumps(dict(parameters=base_parameters, template_id="jupyter")), output_file=processed_param.get('job_log_file', ''), submitter=user ) return ret def submit_all_job(self, user, base_parameter): try: template = Template.objects.filter(enable=True).get( code='jupyter' ) processed_param = template.generate_jobfile( user, base_parameter ) except Template.DoesNotExist as e: raise_from(JobTemplateNotExist, e) ret = self.request_create_job( user, base_parameter, processed_param ) return processed_param, ret class HostDevtools(BaseDevtools): def __init__(self, user): self.user = user def create_jupyter_job(self, base_parameters): import uuid env_uuid = uuid.uuid1().get_hex() base_parameters.update(env_uuid=env_uuid) processed_param, ret = self.submit_all_job(self.user, base_parameters) Devtools.objects.create( user=self.user, password=base_parameters['password'], port=list(ret['ports'].values())[0], language=base_parameters['language'], job=ret['id'], uuid=env_uuid ) return ret def rerun_jupyter(self, base_parameters, jupyter): processed_param, ret = self.submit_all_job(self.user, base_parameters) ret.update(job=ret['id'], port=list(ret['ports'].values())[0]) [ setattr(jupyter, key, valuse) for key, valuse in ret.items() if key != 'id' ] jupyter.save() return ret @staticmethod def queue_host(item): from antilles.common.utils import encode_base64url if item['status'] == "running": import socket return encode_base64url( "{}:{}".format( socket.gethostbyname(item['exechosts'].split("*")[0]), item['port'] ) ) else: return '' def delete_jupyter(self, parameters, jupyter): file_operator = get_file_operator(self.user) if operator.truth(parameters.get('conda_env', '')): conda_env = path.join( self.user.workspace, parameters['conda_env'] ) if file_operator.path_isdir(conda_env): file_operator.rmtree(conda_env) file_operator.mkdir(conda_env) file_operator.chown( conda_env, self.user.uid, self.user.gid ) run_dir = path.join( self.user.workspace, parameters['job_workspace'], '.' + jupyter.uuid + '_run' ) if file_operator.path_isdir(run_dir): file_operator.rmtree(run_dir) jupyter.delete() class KubeDevtools(BaseDevtools): def __init__(self, user): self.user = user def create_jupyter_job(self, base_parameters): base_url, port = self.get_ingress_url(self.user) base_parameters.update( host='lico.lenovo.com', base_url=base_url, container_port=6006, command='/opt/start' ) processed_param, ret = self.submit_all_job(self.user, base_parameters) Devtools.objects.create( user=self.user, password=base_parameters['password'], port=int(port), language=base_parameters['language'], job=ret['id'], uuid=operator.concat('u', processed_param['internal_name']) ) return ret @staticmethod def get_ingress_url(user): from antilles.common.utils import encode_base64url from six.moves.urllib_parse import urlparse import urllib from antilles.kubernetes.utils import get_user_context ingress_addr = get_user_context(user)['ingress_ctrl_addr'] netloc = urlparse(ingress_addr).netloc host, port = urllib.splitport(netloc) port = 80 if port is None else port return encode_base64url(netloc), port def queue_host(self, _): url, _ = self.get_ingress_url(self.user) return url def rerun_jupyter(self, base_parameters, jupyter): processed_param, ret = self.submit_all_job(self.user, base_parameters) jupyter.job = ret['id'] jupyter.uuid = operator.concat('u', processed_param['internal_name']) jupyter.save() return ret def delete_jupyter(self, parameters, jupyter): file_operator = get_file_operator(self.user) if operator.truth(parameters.get('conda_env', '')): conda_env = path.join( self.user.workspace, parameters['conda_env'] ) if file_operator.path_isdir(conda_env): file_operator.rmtree(conda_env) file_operator.mkdir(conda_env) file_operator.chown( conda_env, self.user.uid, self.user.gid ) jupyter.delete()devtools.py
标签:jupyter,self,job,新加,内容,user,operator,import 来源: https://www.cnblogs.com/crazymagic/p/11257326.html