编程语言
首页 > 编程语言> > k8s Python API

k8s Python API

作者:互联网

主要使用https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/ 这个说明, 一般主要是创建pod 和svc,以及更新pod,直接上代码如下:

from kubernetes import client, config
from kubernetes.client.rest import ApiException

def main():
    #可以用以下命令把token 放到一个 文件中
    # Token=$(kubectl describe secret $(kubectl get secret -n kube-system | grep ^admin-user | awk '{print $1}') -n kube-system | grep -E '^token'| awk '{print $2}')
    # echo $Token
    #with open('token.txt', 'r') as file:
     # Token = file.read().strip('\n')
    
    #方法二 获取指定的token
    #kubectl -n kube-system get secret | grep kubernetes-dashboard-token
    #kubectl describe -n kube-system secret/kubernetes-dashboard-token-xxx
   #https://github.com/kubernetes-client/python/blob/master/kubernetes/docs
    Token = 'eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJrdWJlLXN5c3RlbSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VjcmV0Lm5hbWUiOiJrdWJlcm5ldGVzLWRhc2hib2FyZC10b2tlbi05NzJ2NCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50Lm5hbWUiOiJrdWJlcm5ldGVzLWRhc2hib2FyZCIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6ImJmODRjZDNjLTE2NGItNDk3My04NTU4LTY0YzMzNzYwNWQ4OSIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDprdWJlLXN5c3RlbTprdWJlcm5ldGVzLWRhc2hib2FyZCJ9.ryNa8zyhOP8llAJBZGmbCnPzAvGJpdNtH0pYOTWDHH9VVlWDmyNBjrEKiOw4YMXC72JKQrG15tYzN2c3SbXR6qYQ0BeQCTlGhPBmRq1zyHLpOeFNtU2YhPC-iHgX3nLyjqj8iwlsAWxCItQN1bdSsanG3l5OpwSPd9WXALlw56d1Mtm6XCdqYP2NW58xAt8EOibQvzx8RJmbJK1HHswmfBTETxAeqFRq4FIeN3t_JjtdoI5qIyYUSFH3zPZp6IsyXDE_8sL6biUJDbSNCcoy6nv23gslnZ7jc6UKRRPgMW77SISg0E__UmbZ9C0JRKkHNIFRwGCfDwmuoeD46mtC_Q'
    
    APISERVER = 'https://192.168.100.11:6443'

    configuration = client.Configuration()
    setattr(configuration, 'verify_ssl', False)
    client.Configuration.set_default(configuration)
    configuration.host = APISERVER    #ApiHost
    configuration.verify_ssl = False
    configuration.debug = True
    configuration.api_key = {"authorization": "Bearer " + Token}
    client.Configuration.set_default(configuration)

    v1  = client.CoreV1Api(client.ApiClient(configuration))
    nodes= v1.list_node(watch=False)
    for i in nodes.items:
        print("Node .....")
        print(i)
        
    pvc =  v1.list_persistent_volume_claim_for_all_namespaces()
    for i in pvc.items:
        print("PVC .....")
        print(i)
    #ret = v1.list_namespaced_pod("kube-system")
    #list_pod_for_all_namespaces(v1)
    #create_namespace(v1)
    #delete_namespace(v1)
    #list_namespace(v1)
    #create_namespaced_deployment(configuration)
    #update_namespaced_deployment(configuration)
    #delete_namespaced_deployment(configuration)
    #create_namespaced_service(v1)
    #delete_namespaced_service(v1)
    

def list_pod_for_all_namespaces(v1):
    ret = v1.list_pod_for_all_namespaces(watch=False)
    print("Listing pods with their IPs:")
    for i in ret.items:
        print("%s\t%s\t%s" %(i.status.pod_ip, i.metadata.namespace, i.metadata.name))

def create_namespace(v1):
    bodynamespace = {
        "apiVersion": "v1",
        "kind": "Namespace",
        "metadata": {
            "name": "test123",
        }
    }
    ret = v1.create_namespace(body=bodynamespace)
    print("create_namespace result")
    print (ret)

def delete_namespace(v1):
    body = client.V1DeleteOptions()
    body.api_version = "v1"
    body.grace_period_seconds = 0
    ret = v1.delete_namespace("test123", body=body)
    print("delete_namespace result")
    print(ret)

def list_namespace(v1):
    limit = 56                                  #返回最大值,可选参数可以不写
    timeout_seconds = 56                                #超时时间可选参数
    watch = False                                   #监听资源,可选参数可以不填
    try:
        api_response = v1.list_namespace(limit=limit,timeout_seconds=timeout_seconds, watch=watch)
        print("list_namespace result")
        for  namespace in api_response.items:
            print(namespace.metadata.name)
    except ApiException as e:
        print("Exception when calling CoreV1Api->list_namespace: %s\n" % e)

def create_namespaced_deployment(configuration):
    v1 = client.AppsV1Api(client.ApiClient(configuration))
    #http://www.json2yaml.com/ 把yaml转成json ;然后用https://www.json.cn/json/jsonzip.html 压缩json
    '''
    apiVersion: apps/v1
    kind: Deployment
    metadata: 
    name: test1
    spec:
    selector: 
        matchLabels:
        app: test
    replicas: 1
    template:
        metadata:
        labels: 
            app: test
        spec:
        containers:
            - name: test 
            image: nginx:latest 
            imagePullPolicy: IfNotPresent 
            ports:
                - containerPort: 80
    '''
    body=eval('{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"name":"test1"},"spec":{"selector":{"matchLabels":{"app":"test"}},"replicas":1,"template":{"metadata":{"labels":{"app":"test"}},"spec":{"containers":[{"name":"test","image":"nginx:latest","imagePullPolicy":"IfNotPresent","ports":[{"containerPort":80}]}]}}}}')
    resp = v1.create_namespaced_deployment(body=body, namespace="default")
    print("create_namespaced_deployment result")
    print(resp)


def update_namespaced_deployment(configuration):
    v1 = client.AppsV1Api(client.ApiClient(configuration))
    body=eval('{"apiVersion":"apps/v1","kind":"Deployment","metadata":{"name":"test1"},"spec":{"selector":{"matchLabels":{"app":"test"}},"replicas":2,"template":{"metadata":{"labels":{"app":"test"}},"spec":{"containers":[{"name":"test","image":"nginx:latest","imagePullPolicy":"IfNotPresent","ports":[{"containerPort":80}]}]}}}}')
    resp = v1.patch_namespaced_deployment( name="test1",namespace="default", body=body)
    print("patch_namespaced_deployment result")
    print(resp)

def delete_namespaced_deployment(configuration):
    v1 = client.AppsV1Api(client.ApiClient(configuration))
    body=client.V1DeleteOptions(propagation_policy='Foreground',grace_period_seconds=0)
    resp = v1.delete_namespaced_deployment(name="test1",namespace="default", body=body )
    print("delete_namespaced_deployment result")
    print(resp)

def create_namespaced_service(v1):
    namespace = "default"
    body = {'apiVersion': 'v1', 'kind': 'Service', 'metadata': {'name': 'nginx-service', 'labels': {'app': 'nginx'}}, 'spec': {'ports': [{'port': 80, 'targetPort': 80}], 'selector': {'app': 'nginx'}}}
    try:
        api_response = v1.create_namespaced_service(namespace , body)
        print(api_response)
    except ApiException as e:
        print("Exception when calling CoreV1Api->create_namespaced_service: %s\n" % e)

def delete_namespaced_service(v1):
    name = 'nginx-service'                              #要删除svc名称
    namespace = 'default'                               #命名空间
    try:
        api_response = v1.delete_namespaced_service(name, namespace)
        print(api_response)
    except ApiException as e:
        print("Exception when calling CoreV1Api->delete_namespaced_service: %s\n" % e)

    
if __name__ == '__main__':
    main()

 

标签:body,namespaced,configuration,Python,namespace,v1,API,print,k8s
来源: https://www.cnblogs.com/majiang/p/14614686.html