k8s Python API
作者:互联网
主要使用https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/ 这个说明, 一般主要是创建pod 和svc,以及更新pod,直接上代码如下:
from kubernetes import client, config from kubernetes.client.rest import ApiException def main(): #可以用以下命令把token 放到一个 文件中 # Token=$(kubectl describe secret $(kubectl get secret -n kube-system | grep ^admin-user | awk '{print $1}') -n kube-system | grep -E '^token'| awk '{print $2}') # echo $Token #with open('token.txt', 'r') as file: # Token = file.read().strip('\n') #方法二 获取指定的token #kubectl -n kube-system get secret | grep kubernetes-dashboard-token #kubectl describe -n kube-system secret/kubernetes-dashboard-token-xxx #https://github.com/kubernetes-client/python/blob/master/kubernetes/docs Token = 'eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJrdWJlcm5ldGVzLWRhc2hib2FyZC10b2tlbi05NzJ2NCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50Lm5hbWUiOiJrdWJlcm5ldGVzLWRhc2hib2FyZCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImJmODRjZDNjLTE2NGItNDk3My04NTU4LTY0YzMzNzYwNWQ4OSIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprdWJlLXN5c3RlbTprdWJlcm5ldGVzLWRhc2hib2FyZCJ9.ryNa8zyhOP8llAJBZGmbCnPzAvGJpdNtH0pYOTWDHH9VVlWDmyNBjrEKiOw4YMXC72JKQrG15tYzN2c3SbXR6qYQ0BeQCTlGhPBmRq1zyHLpOeFNtU2YhPC-iHgX3nLyjqj8iwlsAWxCItQN1bdSsanG3l5OpwSPd9WXALlw56d1Mtm6XCdqYP2NW58xAt8EOibQvzx8RJmbJK1HHswmfBTETxAeqFRq4FIeN3t_JjtdoI5qIyYUSFH3zPZp6IsyXDE_8sL6biUJDbSNCcoy6nv23gslnZ7jc6UKRRPgMW77SISg0E__UmbZ9C0JRKkHNIFRwGCfDwmuoeD46mtC_Q' APISERVER = 'https://192.168.100.11:6443' configuration = client.Configuration() setattr(configuration, 'verify_ssl', False) client.Configuration.set_default(configuration) configuration.host = APISERVER #ApiHost configuration.verify_ssl = False configuration.debug = True configuration.api_key = {"authorization": "Bearer " + Token} client.Configuration.set_default(configuration) v1 = client.CoreV1Api(client.ApiClient(configuration)) nodes= v1.list_node(watch=False) for i in nodes.items: print("Node .....") print(i) pvc = v1.list_persistent_volume_claim_for_all_namespaces() for i in pvc.items: print("PVC .....") print(i) #ret = v1.list_namespaced_pod("kube-system") #list_pod_for_all_namespaces(v1) #create_namespace(v1) #delete_namespace(v1) #list_namespace(v1) #create_namespaced_deployment(configuration) #update_namespaced_deployment(configuration) #delete_namespaced_deployment(configuration) #create_namespaced_service(v1) #delete_namespaced_service(v1) def list_pod_for_all_namespaces(v1): ret = v1.list_pod_for_all_namespaces(watch=False) print("Listing pods with their IPs:") for i in ret.items: print("%s\t%s\t%s" %(i.status.pod_ip, i.metadata.namespace, i.metadata.name)) def create_namespace(v1): bodynamespace = { "apiVersion": "v1", "kind": "Namespace", "metadata": { "name": "test123", } } ret = v1.create_namespace(body=bodynamespace) print("create_namespace result") print (ret) def delete_namespace(v1): body = client.V1DeleteOptions() body.api_version = "v1" body.grace_period_seconds = 0 ret = v1.delete_namespace("test123", body=body) print("delete_namespace result") print(ret) def list_namespace(v1): limit = 56 #返回最大值,可选参数可以不写 timeout_seconds = 56 #超时时间可选参数 watch = False #监听资源,可选参数可以不填 try: api_response = v1.list_namespace(limit=limit,timeout_seconds=timeout_seconds, watch=watch) print("list_namespace result") for namespace in api_response.items: print(namespace.metadata.name) except ApiException as e: print("Exception when calling CoreV1Api->list_namespace: %s\n" % e) def create_namespaced_deployment(configuration): v1 = client.AppsV1Api(client.ApiClient(configuration)) #http://www.json2yaml.com/ 把yaml转成json ;然后用https://www.json.cn/json/jsonzip.html 压缩json ''' apiVersion: apps/v1 kind: Deployment metadata: name: test1 spec: selector: matchLabels: app: test replicas: 1 template: metadata: labels: app: test spec: containers: - name: test image: nginx:latest imagePullPolicy: IfNotPresent ports: - containerPort: 80 ''' body=eval('{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"name":"test1"},"spec":{"selector":{"matchLabels":{"app":"test"}},"replicas":1,"template":{"metadata":{"labels":{"app":"test"}},"spec":{"containers":[{"name":"test","image":"nginx:latest","imagePullPolicy":"IfNotPresent","ports":[{"containerPort":80}]}]}}}}') resp = v1.create_namespaced_deployment(body=body, namespace="default") print("create_namespaced_deployment result") print(resp) def update_namespaced_deployment(configuration): v1 = client.AppsV1Api(client.ApiClient(configuration)) body=eval('{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"name":"test1"},"spec":{"selector":{"matchLabels":{"app":"test"}},"replicas":2,"template":{"metadata":{"labels":{"app":"test"}},"spec":{"containers":[{"name":"test","image":"nginx:latest","imagePullPolicy":"IfNotPresent","ports":[{"containerPort":80}]}]}}}}') resp = v1.patch_namespaced_deployment( name="test1",namespace="default", body=body) print("patch_namespaced_deployment result") print(resp) def delete_namespaced_deployment(configuration): v1 = client.AppsV1Api(client.ApiClient(configuration)) body=client.V1DeleteOptions(propagation_policy='Foreground',grace_period_seconds=0) resp = v1.delete_namespaced_deployment(name="test1",namespace="default", body=body ) print("delete_namespaced_deployment result") print(resp) def create_namespaced_service(v1): namespace = "default" body = {'apiVersion': 'v1', 'kind': 'Service', 'metadata': {'name': 'nginx-service', 'labels': {'app': 'nginx'}}, 'spec': {'ports': [{'port': 80, 'targetPort': 80}], 'selector': {'app': 'nginx'}}} try: api_response = v1.create_namespaced_service(namespace , body) print(api_response) except ApiException as e: print("Exception when calling CoreV1Api->create_namespaced_service: %s\n" % e) def delete_namespaced_service(v1): name = 'nginx-service' #要删除svc名称 namespace = 'default' #命名空间 try: api_response = v1.delete_namespaced_service(name, namespace) print(api_response) except ApiException as e: print("Exception when calling CoreV1Api->delete_namespaced_service: %s\n" % e) if __name__ == '__main__': main()
标签:body,namespaced,configuration,Python,namespace,v1,API,print,k8s 来源: https://www.cnblogs.com/majiang/p/14614686.html