华为公有云运维开发—— 基于FastAPI框架封装华为云VPC服务Restful APIs接口

使用已建好的运维开发环境,可用区为华北-北京4,在/root/huaweicloud_fastapi目录下创建main.py文件,基于FastAPI框架,完成虚拟私有云VPC管理服务的封装,端口为7045。
提示说明:华为云主机支持安装所需Python库。提交前答案前,需安装所开发程序所依赖的Python库。
(1)封装创建接口,路径为/cloud_vpc/create_vpc,参数格式为{"name": "","cidr": ""},name为VPC名称,cidr为虚拟私有云下可用子网的范围;方法为POST,返回数据类型为JSON类型。
(2)封装单个VPC查询接口,路径为/cloud_vpc/vpc/{vpc_name},路径中{vpc_name}为待查询VPC;方法为GET,返回数据类型为JSON类型。
(3)封装服务查询接口,要求可以查询到所有VPC,路径为/cloud_vpc/vpc;方法为GET,返回的数据类型JSON类型的列表类型。
(4)封装更新VPN名称接口,路径为/cloud_vpc/update_vpc,参数格式为{"new_name": "","old_name": ""},new_name为VPC新名称,old_name为待删除的VPN名称;方法为PUT,返回JSON类型的更新后数据。
(5)封装删除接口,接口名为/cloud_vpc/delete_vpc,参数格式为{"vpc_name": ""},vpc_name为VPC新名称;方法为DELETE,返回JSON类型的删除后的数据。
功能编写完成后,手工启动Restful服务,再提交检测。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @author YWLBTWTK
# @date 2023/9/18
from fastapi import FastAPI
import uvicorn
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *

app = FastAPI()
ak = "UCJEM0YMIIEGCS3SGW4R"
sk = "ueyWtMjX5o9KBbQkFKPWx14Jkiqbq4VQ2EQjacfn"
g_region = "cn-north-4"


def api_list_vpc(name):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()

    try:
        request = ListVpcsRequest()
        response = list(map(lambda x: {'name': x['name'], 'id': x['id']}, client.list_vpcs(request).to_dict()['vpcs']))
        for i in response:
            if i['name'] == name:
                print(i['id'])
                return i['id']
        return None
    except Exception:
        return None


@app.post('/cloud_vpc/create_vpc')
def api_create_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = CreateVpcRequest()
        vpcbody = CreateVpcOption(
            cidr=data['cidr'],
            name=data['name']
        )
        request.body = CreateVpcRequestBody(
            vpc=vpcbody
        )
        response = client.create_vpc(request)
        print(response)
        return response
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
        return e
    except Exception as e:
        return e


@app.get('/cloud_vpc/vpc/{vpc_name}')
def api_get_vpc(vpc_name: str):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = ShowVpcRequest()
        request.vpc_id = api_list_vpc(vpc_name)
        response = client.show_vpc(request)
        print(response)
        return response
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
        return e
    except Exception as e:
        return e


@app.get('/cloud_vpc/vpc')
def api_get_list_vpc():
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = ListVpcsRequest()
        response = client.list_vpcs(request).to_dict()
        print(response)
        return response
    except Exception as e:
        return e


@app.put('/cloud_vpc/update_vpc')
def api_update_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = UpdateVpcRequest()
        request.vpc_id = api_list_vpc(data['old_name'])
        vpcbody = UpdateVpcOption(
            name=data['new_name']
        )
        request.body = UpdateVpcRequestBody(
            vpc=vpcbody
        )
        response = client.update_vpc(request)
        print(response)
        return response.to_dict()
    except Exception as e:
        return e


@app.delete('/cloud_vpc/delete_vpc')
def api_delete_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()

    try:
        request = DeleteVpcRequest()
        request.vpc_id = api_list_vpc(data['vpc_name'])
        response = client.delete_vpc(request)
        print(response)
        return response.to_dict()
    except Exception as e:
        return e


if __name__ == '__main__':
    uvicorn.run(app='main:app', host='0.0.0.0', port=7045, reload=True)
该文章的评论已关闭