Django接口自动化平台实现(五)
8. 测试用例执行
预期效果如下:
用例执行逻辑如下:
- 前端提交用例 id 列表到后台,后台获取每一条用例的信息;
- 后台获取域名信息、用例 id 列表;
- 对用例的请求数据进行变量的参数化、函数化等预处理操作;
- 根据先后顺序进行接口请求,并对响应数据进行断言;
- 根据用例中的提取变量表达式,从断言成功的响应数据中提取关联变量值用于后续用例使用。
8.1 修改测试用例页模板文件:前端提交用例信息
templates/test_case.html:
1 {% extends \'base.html\' %} 2 {% load static %} 3 {% block title %}测试用例{% endblock %} 4 5 {% block content %} 6 7 //页面加载的时候,所有的复选框都是未选中的状态 8 function checkOrCancelAll() { 9 var all_check = document.getElementById(\"all_check\"); //1.获取all的元素对象 10 var all_check = all_check.checked; //2.获取选中状态 11 //3.若checked=true,将所有的复选框选中;checked=false,将所有的复选框取消 12 var allCheck = document.getElementsByName(\"test_cases_list\"); 13 //4.循环遍历取出每一个复选框中的元素 14 if (all_check)//全选 15 { 16 for (var i = 0; i < allCheck.length; i++) { 17 //设置复选框的选中状态 18 allCheck[i].checked = true; 19 } 20 } else//取消全选 21 { 22 for (var i = 0; i < allCheck.length; i++) { 23 allCheck[i].checked = false; 24 } 25 } 26 } 27 28 function ischecked() { 29 //3.若checked=true,将所有的复选框选中,checked=false,将所有的复选框取消 30 var allCheck = document.getElementsByName(\"test_cases_list\"); 31 for (var i = 0; i < allCheck.length; i++) { 32 if (allCheck[i].checked == true) { 33 alert(\"所需执行的测试用例提交成功!\"); 34 return true 35 } 36 } 37 alert(\"请选择要执行的测试用例!\") 38 return false 39 } 40 41 42 43 44 {% csrf_token %} 45 46 运行环境: 47 48 dev 49 prod 50 515285 86 {# 实现分页标签的代码 #} 87 {# 这里使用 bootstrap 渲染页面 #} 8853 54
83 8455 65 66 67 68 {% for test_case in test_cases %} 69全选 56用例名称 57所属项目 58所属模块 59接口地址 60请求方式 61请求数据 62断言key 63提取变量表达式 6470 80 {% endfor %} 81 82<input type=\"checkbox\" value=\"{{ test_case.id }}\" name=\"test_cases_list\"> {{ test_case.id }} 71{{ test_case.case_name }} 72{{ test_case.belong_project.name }} 73{{ test_case.belong_module.name }} 74{{ test_case.uri }} 75{{ test_case.request_method }} 76{{ test_case.request_data }} 77{{ test_case.assert_key }} 78{{ test_case.extract_var }} 7989 105106 {% endblock %}
8.2 定义接口地址模型类
models.py:
1 from django.db import models 2 from smart_selects.db_fields import GroupedForeignKey # pip install django-smart-selects:后台级联选择 3 from django.contrib.auth.models import User 4 5 6 class Project(models.Model): 7 id = models.AutoField(primary_key=True) 8 name = models.CharField(\'项目名称\', max_length=50, unique=True, null=False) 9 proj_owner = models.CharField(\'项目负责人\', max_length=20, null=False) 10 test_owner = models.CharField(\'测试负责人\', max_length=20, null=False) 11 dev_owner = models.CharField(\'开发负责人\', max_length=20, null=False) 12 desc = models.CharField(\'项目描述\', max_length=100, null=True) 13 create_time = models.DateTimeField(\'项目创建时间\', auto_now_add=True) 14 update_time = models.DateTimeField(\'项目更新时间\', auto_now=True, null=True) 15 16 def __str__(self): 17 return self.name 18 19 class Meta: 20 verbose_name = \'项目信息表\' 21 verbose_name_plural = \'项目信息表\' 22 23 24 class Module(models.Model): 25 id = models.AutoField(primary_key=True) 26 name = models.CharField(\'模块名称\', max_length=50, null=False) 27 belong_project = models.ForeignKey(Project, on_delete=models.CASCADE) 28 test_owner = models.CharField(\'测试负责人\', max_length=50, null=False) 29 desc = models.CharField(\'简要描述\', max_length=100, null=True) 30 create_time = models.DateTimeField(\'创建时间\', auto_now_add=True) 31 update_time = models.DateTimeField(\'更新时间\', auto_now=True, null=True) 32 33 def __str__(self): 34 return self.name 35 36 class Meta: 37 verbose_name = \'模块信息表\' 38 verbose_name_plural = \'模块信息表\' 39 40 41 class TestCase(models.Model): 42 id = models.AutoField(primary_key=True) 43 case_name = models.CharField(\'用例名称\', max_length=50, null=False) # 如 register 44 belong_project = models.ForeignKey(Project, on_delete=models.CASCADE, verbose_name=\'所属项目\') 45 belong_module = GroupedForeignKey(Module, \"belong_project\", on_delete=models.CASCADE, verbose_name=\'所属模块\') 46 request_data = models.CharField(\'请求数据\', max_length=1024, null=False, default=\'\') 47 uri = models.CharField(\'接口地址\', max_length=1024, null=False, default=\'\') 48 assert_key = models.CharField(\'断言内容\', max_length=1024, null=True) 49 maintainer = models.CharField(\'编写人员\', max_length=1024, null=False, default=\'\') 50 extract_var = models.CharField(\'提取变量表达式\', max_length=1024, null=True) # 示例:userid||userid\": (\\d+) 51 request_method = models.CharField(\'请求方式\', max_length=1024, null=True) 52 status = models.IntegerField(null=True, help_text=\"0:表示有效,1:表示无效,用于软删除\") 53 created_time = models.DateTimeField(\'创建时间\', auto_now_add=True) 54 updated_time = models.DateTimeField(\'更新时间\', auto_now=True, null=True) 55 user = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name=\'责任人\', null=True) 56 57 def __str__(self): 58 return self.case_name 59 60 class Meta: 61 verbose_name = \'测试用例表\' 62 verbose_name_plural = \'测试用例表\' 63 64 65 class CaseSuite(models.Model): 66 id = models.AutoField(primary_key=True) 67 suite_desc = models.CharField(\'用例集合描述\', max_length=100, blank=True, null=True) 68 if_execute = models.IntegerField(verbose_name=\'是否执行\', null=False, default=0, help_text=\'0:执行;1:不执行\') 69 test_case_model = models.CharField(\'测试执行模式\', max_length=100, blank=True, null=True, help_text=\'data/keyword\') 70 creator = models.CharField(max_length=50, blank=True, null=True) 71 create_time = models.DateTimeField(\'创建时间\', auto_now=True) # 创建时间-自动获取当前时间 72 73 class Meta: 74 verbose_name = \"用例集合表\" 75 verbose_name_plural = \'用例集合表\' 76 77 78 class SuiteCase(models.Model): 79 id = models.AutoField(primary_key=True) 80 case_suite = models.ForeignKey(CaseSuite, on_delete=models.CASCADE, verbose_name=\'用例集合\') 81 test_case = models.ForeignKey(TestCase, on_delete=models.CASCADE, verbose_name=\'测试用例\') 82 status = models.IntegerField(verbose_name=\'是否有效\', null=False, default=1, help_text=\'0:有效,1:无效\') 83 create_time = models.DateTimeField(\'创建时间\', auto_now=True) # 创建时间-自动获取当前时间 84 85 86 class InterfaceServer(models.Model): 87 id = models.AutoField(primary_key=True) 88 env = models.CharField(\'环境\', max_length=50, null=False, default=\'\') 89 ip = models.CharField(\'ip\', max_length=50, null=False, default=\'\') 90 port = models.CharField(\'端口\', max_length=100, null=False, default=\'\') 91 remark = models.CharField(\'备注\', max_length=100, null=True) 92 create_time = models.DateTimeField(\'创建时间\', auto_now_add=True) 93 update_time = models.DateTimeField(\'更新时间\', auto_now=True, null=True) 94 95 def __str__(self): 96 return self.env 97 98 class Meta: 99 verbose_name = \'接口地址配置表\'100 verbose_name_plural = \'接口地址配置表\'
执行数据迁移:
python manage.py makemigrationspython manage.py migrate
admin.py:
1 from django.contrib import admin 2 from .import models 3 4 5 class ProjectAdmin(admin.ModelAdmin): 6 list_display = (\"id\", \"name\", \"proj_owner\", \"test_owner\", \"dev_owner\", \"desc\", \"create_time\", \"update_time\") 7 8 admin.site.register(models.Project, ProjectAdmin) 9 10 11 class ModuleAdmin(admin.ModelAdmin):12 list_display = (\"id\", \"name\", \"belong_project\", \"test_owner\", \"desc\", \"create_time\", \"update_time\")13 14 admin.site.register(models.Module, ModuleAdmin)15 16 17 class TestCaseAdmin(admin.ModelAdmin):18 list_display = (19 \"id\", \"case_name\", \"belong_project\", \"belong_module\", \"request_data\", \"uri\", \"assert_key\", \"maintainer\",20 \"extract_var\", \"request_method\", \"status\", \"created_time\", \"updated_time\", \"user\")21 22 admin.site.register(models.TestCase, TestCaseAdmin)23 24 25 class CaseSuiteAdmin(admin.ModelAdmin):26 list_display = (\"id\", \"suite_desc\", \"creator\", \"create_time\")27 28 admin.site.register(models.CaseSuite, CaseSuiteAdmin)29 30 31 class InterfaceServerAdmin(admin.ModelAdmin):32 list_display = (\"id\", \"env\", \"ip\", \"port\", \"remark\", \"create_time\")33 34 admin.site.register(models.InterfaceServer, InterfaceServerAdmin)
登录 admin 系统,添加地址配置数据:
8.3 修改测试用例视图函数,后台执行用例
1)Redis 持久化递增唯一数
在本工程中,我们使用 Redis 来维护一个每次调用函数来就会递增的数值,供注册接口的注册用户名拼接使用,避免注册接口请求数据重复使用问题。
1.1)Redis 持久化配置
修改 redis.windows.conf:
appendonly yes # 每次更新操作后进行日志记录appendfsync everysec # 每秒同步一次(默认值)
1.2)启动 Redis 服务端:
redis-server.exe redis.windows.conf
2)请求/响应数据处理
在应用目录下新建 utils 包,用于封装接口请求的相关函数。
data_process.py
该模块实现了对接口请求的所需工具函数,如获取递增唯一数(供注册用户名使用)、md5 加密(用于登录密码加密)、请求数据预处理、响应数据断言等功能。
- get_unique_num_value():用于获取每次递增的唯一数
- 该函数的目标是解决注册用户名重复的问题。
- 虽然可以在赋值注册用户名变量时,采用前缀字符串拼接随机数的方式,但是用随机数的方式仍然是有可能出现用户名重复的情况。因此,可以在单独的一个文件中维护一个数字,每次请求注册接口之前,先读取该文件中的数字,拼接用户名前缀字符串。读取完之后,再把这个数字进行加一的操作并保存,即每读取一次这个数字之后,就做一次修改,进而保证每次拼接的用户名都是唯一的,避免出现因为用户名重复导致用例执行失败的情况。
- data_preprocess():对请求数据进行预处理:参数化及函数化。
- data_postprocess():将响应数据需要关联的参数保存进全局变量,供后续接口使用。
- assert_result():对响应数据进行关键字断言。
1 import re 2 import hashlib 3 import os 4 import json 5 import traceback 6 import redis 7 from InterfaceAutoTest.settings import redis_port 8 9 10 # 连接redis 11 pool = redis.ConnectionPool(host=\'localhost\', port=redis_port, decode_responses=True) 12 redis_obj = redis.Redis(connection_pool=pool) 13 14 15 # 初始化框架工程中的全局变量,存储在测试数据中的唯一值数据 16 # 框架工程中若要使用字典中的任意一个变量,则每次使用后,均需要将字典中的value值进行加1操作。 17 def get_unique_number_value(unique_number): 18 data = None 19 try: 20 redis_value = redis_obj.get(unique_number) # {\"unique_number\": 666} 21 if redis_value: 22 data = redis_value 23 print(\"全局唯一数当前生成的值是:%s\" % data) 24 # 把redis中key为unique_number的值进行加一操作,以便下提取时保持唯一 25 redis_obj.set(unique_number, int(redis_value) + 1) 26 else: 27 data = 1000 # 初始化递增数值 28 redis_obj.set(unique_number, data) 29 except Exception as e: 30 print(\"获取全局唯一数变量值失败,请求的全局唯一数变量是%s,异常原因如下:%s\" % (unique_number, traceback.format_exc())) 31 data = None 32 finally: 33 return data 34 35 36 def md5(s): 37 m5 = hashlib.md5() 38 m5.update(s.encode(\"utf-8\")) 39 md5_value = m5.hexdigest() 40 return md5_value 41 42 43 # 请求数据预处理:参数化、函数化 44 # 将请求数据中包含的${变量名}的字符串部分,替换为唯一数或者全局变量字典中对应的全局变量 45 def data_preprocess(global_key, requestData): 46 try: 47 # 匹配注册用户名参数,即\"${unique_num...}\"的格式,并取出本次请求的随机数供后续接口的用户名参数使用 48 if re.search(r\"\\$\\{unique_num\\d+\\}\", requestData): 49 var_name = re.search(r\"\\$\\{(unique_num\\d+)\\}\", requestData).group(1) # 获取用户名参数 50 print(\"用户名变量:%s\" % var_name) 51 var_value = get_unique_number_value(var_name) 52 print(\"用户名变量值: %s\" % var_value) 53 requestData = re.sub(r\"\\$\\{unique_num\\d+\\}\", str(var_value), requestData) 54 var_name = var_name.split(\"_\")[1] 55 print(\"关联的用户名变量: %s\" % var_name) 56 # \"xxxkey\" : \"{\'var_name\': var_value}\" 57 global_var = json.loads(os.environ[global_key]) 58 global_var[var_name] = var_value 59 os.environ[global_key] = json.dumps(global_var) 60 print(\"用户名唯一数参数化后的全局变量【os.environ[global_key]】: {}\".format(os.environ[global_key])) 61 # 函数化,如密码加密\"${md5(...)}\"的格式 62 if re.search(r\"\\$\\{\\w+\\(.+\\)\\}\", requestData): 63 var_pass = re.search(r\"\\$\\{(\\w+\\(.+\\))\\}\", requestData).group(1) # 获取密码参数 64 print(\"需要函数化的变量: %s\" % var_pass) 65 print(\"函数化后的结果: %s\" % eval(var_pass)) 66 requestData = re.sub(r\"\\$\\{\\w+\\(.+\\)\\}\", eval(var_pass), requestData) # 将requestBody里面的参数内容通过eval修改为实际变量值 67 print(\"函数化后的请求数据: %s\" % requestData) # requestBody是拿到的请求时发送的数据 68 # 其余变量参数化 69 if re.search(r\"\\$\\{(\\w+)\\}\", requestData): 70 print(\"需要参数化的变量: %s\" % (re.findall(r\"\\$\\{(\\w+)\\}\", requestData))) 71 for var_name in re.findall(r\"\\$\\{(\\w+)\\}\", requestData): 72 requestData = re.sub(r\"\\$\\{%s\\}\" % var_name, str(json.loads(os.environ[global_key])[var_name]), requestData) 73 print(\"变量参数化后的最终请求数据: %s\" % requestData) 74 print(\"数据参数后的最终全局变量【os.environ[global_key]】: {}\".format(os.environ[global_key])) 75 return 0, requestData, \"\" 76 except Exception as e: 77 print(\"请求数据预处理发生异常,error:{}\".format(traceback.format_exc())) 78 return 1, {}, traceback.format_exc() 79 80 81 # 响应数据提取关联参数 82 def data_postprocess(global_key, response_data, extract_var): 83 print(\"需提取的关联变量:%s\" % extract_var) 84 var_name = extract_var.split(\"||\")[0] 85 print(\"关联变量名:%s\" % var_name) 86 regx_exp = extract_var.split(\"||\")[1] 87 print(\"关联变量正则:%s\" % regx_exp) 88 if re.search(regx_exp, response_data): 89 global_vars = json.loads(os.environ[global_key]) 90 print(\"关联前的全局变量:{}\".format(global_vars)) 91 global_vars[var_name] = re.search(regx_exp, response_data).group(1) 92 os.environ[global_key] = json.dumps(global_vars) 93 print(\"关联前的全局变量:{}\".format(os.environ[global_key])) 94 return 95 96 97 # 响应数据 断言处理 98 def assert_result(response_obj, key_word): 99 try:100 # 多个断言关键字101 if \'&&\' in key_word:102 key_word_list = key_word.split(\'&&\')103 print(\"断言关键字列表:%s\" % key_word_list)104 # 断言结果标识符105 flag = True106 exception_info = \'\'107 # 遍历分隔出来的断言关键词列表108 for key_word in key_word_list:109 # 如果断言词非空,则进行断言110 if key_word:111 # 没查到断言词则认为是断言失败112 if not (key_word in json.dumps(response_obj.json(), ensure_ascii=False)):113 print(\"断言关键字【{}】匹配失败\".format(key_word))114 flag = False # 只要有一个断言词匹配失败,则整个接口断言失败115 exception_info = \"keyword: {} not matched from response, assert failed\".format(key_word)116 else:117 print(\"断言关键字【{}】匹配成功\".format(key_word))118 if flag:119 print(\"接口断言成功!\")120 else:121 print(\"接口断言失败!\")122 return flag, exception_info123 # 单个断言关键字124 else:125 if key_word in json.dumps(response_obj.json(), ensure_ascii=False):126 print(\"接口断言【{}】匹配成功!\".format(key_word))127 return True, \'\'128 else:129 print(\"接口断言【{}】匹配失败!\".format(key_word))130 return False, \'\'131 except Exception as e:132 return False, traceback.format_exc()133 134 135 # 测试代码136 if __name__ == \"__main__\":137 print(get_unique_number_value(\"unique_num1\"))
request_process.py
该模块实现了对接口请求的封装。
1 import requests 2 import json 3 # from Util.Log import logger 4 5 6 # 此函数封装了get请求、post和put请求的方法 7 def request_process(url, request_method, request_content): 8 print(\"-------- 开始调用接口 --------\") 9 if request_method == \"get\":10 try:11 if isinstance(request_content, dict):12 print(\"接口地址:%s\" % url)13 print(\"请求数据:%s\" % request_content)14 r = requests.get(url, params=json.dumps(request_content))15 else:16 r = requests.get(url+str(request_content))17 print(\"接口地址:%s\" % r.url)18 print(\"请求数据:%s\" % request_content)19 20 except Exception as e:21 print(\"get方法请求发生异常:请求的url是%s, 请求的内容是%s\\n发生的异常信息如下:%s\" % (url, request_content, e))22 r = None23 return r24 elif request_method == \"post\":25 try:26 if isinstance(request_content, dict):27 print(\"接口地址:%s\" % url)28 print(\"请求数据:%s\" % json.dumps(request_content))29 r = requests.post(url, data=json.dumps(request_content))30 else:31 raise ValueError32 except ValueError as e:33 print(\"post方法请求发生异常:请求的url是%s, 请求的内容是%s\\n发生的异常信息如下:%s\" % (url, request_content, \"请求参数不是字典类型\"))34 r = None35 except Exception as e:36 print(\"post方法请求发生异常:请求的url是%s, 请求的内容是%s\\n发生的异常信息如下:%s\" % (url, request_content, e))37 r = None38 return r39 elif request_method == \"put\":40 try:41 if isinstance(request_content, dict):42 print(\"接口地址:%s\" % url)43 print(\"请求数据:%s\" % json.dumps(request_content))44 r = requests.put(url, data=json.dumps(request_content))45 else:46 raise ValueError47 except ValueError as e:48 print(\"put方法请求发生异常:请求的url是%s, 请求的内容是%s\\n发生的异常信息如下:%s\" % (url, request_content, \"请求参数不是字典类型\"))49 r = None50 except Exception as e:51 print(\"put方法请求发生异常:请求的url是%s, 请求的内容是%s\\n发生的异常信息如下:%s\" % (url, request_content, e))52 r = None53 return r
3)封装接口用例执行方法
在应用目录下新建 task.py:
1 import time 2 import os 3 import traceback 4 import json 5 from . import models 6 from .utils.data_process import data_preprocess, assert_result, data_postprocess 7 from .utils.request_process import request_process 8 9 10 def case_task(test_case_id_list, server_address):11 global_key = \'case\'+ str(int(time.time() * 100000))12 os.environ[global_key] = \'{}\'13 print()14 print(\"全局变量标识符【global_key】: {}\".format(global_key))15 print(\"全局变量内容【os.environ[global_key]】: {}\".format(os.environ[global_key]))16 for test_case_id in test_case_id_list:17 print()18 test_case = models.TestCase.objects.filter(id=int(test_case_id))[0]19 print(\"######### 开始执行用例【{}】 #########\".format(test_case))20 execute_start_time = time.time() # 记录时间戳,便于计算总耗时(毫秒)21 request_data = test_case.request_data22 extract_var = test_case.extract_var23 assert_key = test_case.assert_key24 interface_name = test_case.uri25 belong_project = test_case.belong_project26 belong_module = test_case.belong_module27 maintainer = test_case.maintainer28 request_method = test_case.request_method29 print(\"初始请求数据: {}\".format(request_data))30 print(\"关联参数: {}\".format(extract_var))31 print(\"断言关键字: {}\".format(assert_key))32 print(\"接口名称: {}\".format(interface_name))33 print(\"所属项目: {}\".format(belong_project))34 print(\"所属模块: {}\".format(belong_module))35 print(\"用例维护人: {}\".format(maintainer))36 print(\"请求方法: {}\".format(request_method))37 url = \"{}{}\".format(server_address, interface_name)38 print(\"接口地址: {}\".format(url))39 code, request_data, error_msg = data_preprocess(global_key, str(request_data))40 try:41 res_data = request_process(url, request_method, json.loads(request_data))42 print(\"响应数据: {}\".format(json.dumps(res_data.json(), ensure_ascii=False))) # ensure_ascii:兼容中文43 result_flag, exception_info = assert_result(res_data, assert_key)44 if result_flag:45 print(\"用例【%s】执行成功!\" % test_case)46 if extract_var.strip() != \"None\":47 data_postprocess(global_key, json.dumps(res_data.json(), ensure_ascii=False), extract_var)48 else:49 print(\"用例【%s】执行失败!\" % test_case)50 except Exception as e:51 print(\"接口请求异常,error: {}\".format(traceback.format_exc()))
4)修改测试用例视图函数
1 from django.shortcuts import render, redirect, HttpResponse 2 from django.contrib import auth # Django用户认证(Auth)组件一般用在用户的登录注册上,用于判断当前的用户是否合法 3 from django.contrib.auth.decorators import login_required 4 from django.core.paginator import Paginator, PageNotAnInteger, InvalidPage 5 from .form import UserForm 6 import traceback 7 from .models import Project, Module, TestCase, CaseSuite, SuiteCase, InterfaceServer 8 from .task import case_task 9 10 11 # 封装分页处理 12 def get_paginator(request, data): 13 paginator = Paginator(data, 10) # 默认每页展示10条数据 14 # 获取 url 后面的 page 参数的值, 首页不显示 page 参数, 默认值是 1 15 page = request.GET.get(\'page\') 16 try: 17 paginator_pages = paginator.page(page) 18 except PageNotAnInteger: 19 # 如果请求的页数不是整数, 返回第一页。 20 paginator_pages = paginator.page(1) 21 except InvalidPage: 22 # 如果请求的页数不存在, 重定向页面 23 return HttpResponse(\'找不到页面的内容\') 24 return paginator_pages 25 26 27 # 项目菜单项 28 @login_required 29 def project(request): 30 print(\"request.user.is_authenticated: \", request.user.is_authenticated) 31 projects = Project.objects.filter().order_by(\'-id\') 32 print(\"projects:\", projects) 33 return render(request, \'project.html\', {\'projects\': get_paginator(request, projects)}) 34 35 36 # 模块菜单项 37 @login_required 38 def module(request): 39 if request.method == \"GET\": # 请求get时候,id倒序查询所有的模块数据 40 modules = Module.objects.filter().order_by(\'-id\') 41 return render(request, \'module.html\', {\'modules\': get_paginator(request, modules)}) 42 else: # 否则就是Post请求,会根据输入内容,使用模糊的方式查找所有的项目 43 proj_name = request.POST[\'proj_name\'] 44 projects = Project.objects.filter(name__contains=proj_name.strip()) 45 projs = [proj.id for proj in projects] 46 modules = Module.objects.filter(belong_project__in=projs) # 把项目中所有的模块都找出来 47 return render(request, \'module.html\', {\'modules\': get_paginator(request, modules), \'proj_name\': proj_name}) 48 49 50 # 获取测试用例执行的接口地址 51 def get_server_address(env): 52 if env: # 环境处理 53 env_data = InterfaceServer.objects.filter(env=env[0]) 54 print(\"env_data: {}\".format(env_data)) 55 if env_data: 56 ip = env_data[0].ip 57 port = env_data[0].port 58 print(\"ip: {}, port: {}\".format(ip, port)) 59 server_address = \"http://{}:{}\".format(ip, port) 60 print(\"server_address: {}\".format(server_address)) 61 return server_address 62 else: 63 return \"\" 64 else: 65 return \"\" 66 67 68 # 测试用例菜单项 69 @login_required 70 def test_case(request): 71 print(\"request.session[\'is_login\']: {}\".format(request.session[\'is_login\'])) 72 test_cases = \"\" 73 if request.method == \"GET\": 74 test_cases = TestCase.objects.filter().order_by(\'id\') 75 print(\"testcases: {}\".format(test_cases)) 76 elif request.method == \"POST\": 77 print(\"request.POST: {}\".format(request.POST)) 78 test_case_id_list = request.POST.getlist(\'test_cases_list\') 79 env = request.POST.getlist(\'env\') 80 print(\"env: {}\".format(env)) 81 server_address = get_server_address(env) 82 if not server_address: 83 return HttpResponse(\"提交的运行环境为空,请选择环境后再提交!\") 84 if test_case_id_list: 85 test_case_id_list.sort() 86 print(\"test_case_id_list: {}\".format(test_case_id_list)) 87 print(\"获取到用例,开始用例执行\") 88 case_task(test_case_id_list, server_address) 89 else: 90 print(\"运行测试用例失败\") 91 return HttpResponse(\"提交的运行测试用例为空,请选择用例后在提交!\") 92 test_cases = TestCase.objects.filter().order_by(\'id\') 93 return render(request, \'test_case.html\', {\'test_cases\': get_paginator(request, test_cases)}) 94 95 96 # 用例详情页 97 @login_required 98 def test_case_detail(request, test_case_id): 99 test_case_id = int(test_case_id)100 test_case = TestCase.objects.get(id=test_case_id)101 print(\"test_case: {}\".format(test_case))102 print(\"test_case.id: {}\".format(test_case.id))103 print(\"test_case.belong_project: {}\".format(test_case.belong_project))104 105 return render(request, \'test_case_detail.html\', {\'test_case\': test_case})106 107 108 # 模块页展示测试用例109 @login_required110 def module_test_cases(request, module_id):111 module = \"\"112 if module_id: # 访问的时候,会从url中提取模块的id,根据模块id查询到模块数据,在模板中展现113 module = Module.objects.get(id=int(module_id))114 test_cases = TestCase.objects.filter(belong_module=module).order_by(\'-id\')115 print(\"test_case in module_test_cases: {}\".format(test_cases))116 return render(request, \'test_case.html\', {\'test_cases\': get_paginator(request, test_cases)})117 118 119 # 用例集合菜单项120 @login_required121 def case_suite(request):122 case_suites = CaseSuite.objects.filter()123 return render(request, \'case_suite.html\', {\'case_suites\': get_paginator(request, case_suites)})124 125 126 # 用例集合-添加测试用例页127 @login_required128 def add_case_in_suite(request, suite_id):129 # 查询指定的用例集合130 case_suite = CaseSuite.objects.get(id=suite_id)131 # 根据id号查询所有的用例132 test_cases = TestCase.objects.filter().order_by(\'id\')133 if request.method == \"GET\":134 print(\"test cases:\", test_cases)135 elif request.method == \"POST\":136 test_cases_list = request.POST.getlist(\'testcases_list\')137 # 如果页面勾选了用例138 if test_cases_list:139 print(\"勾选用例id:\", test_cases_list)140 # 根据页面勾选的用例与查询出的所有用例一一比较141 for test_case in test_cases_list:142 test_case = TestCase.objects.get(id=int(test_case))143 # 匹配成功则添加用例144 SuiteCase.objects.create(case_suite=case_suite, test_case=test_case)145 # 未勾选用例146 else:147 print(\"添加测试用例失败\")148 return HttpResponse(\"添加的测试用例为空,请选择用例后再添加!\")149 return render(request, \'add_case_in_suite.html\',150 {\'test_cases\': get_paginator(request, test_cases), \'case_suite\': case_suite})151 152 153 # 用例集合页-查看/删除用例154 @login_required155 def show_and_delete_case_in_suite(request, suite_id):156 case_suite = CaseSuite.objects.get(id=suite_id)157 test_cases = SuiteCase.objects.filter(case_suite=case_suite)158 if request.method == \"POST\":159 test_cases_list = request.POST.getlist(\'test_cases_list\')160 if test_cases_list:161 print(\"勾选用例:\", test_cases_list)162 for test_case in test_cases_list:163 test_case = TestCase.objects.get(id=int(test_case))164 SuiteCase.objects.filter(case_suite=case_suite, test_case=test_case).first().delete()165 else:166 print(\"测试用例删除失败\")167 return HttpResponse(\"所选测试用例为空,请选择用例后再进行删除!\")168 case_suite = CaseSuite.objects.get(id=suite_id)169 return render(request, \'show_and_delete_case_in_suite.html\',170 {\'test_cases\': get_paginator(request, test_cases), \'case_suite\': case_suite})171 172 173 # 默认页的视图函数174 @login_required175 def index(request):176 return render(request, \'index.html\')177 178 179 # 登录页的视图函数180 def login(request):181 print(\"request.session.items(): {}\".format(request.session.items())) # 打印session信息182 if request.session.get(\'is_login\', None):183 return redirect(\'/\')184 # 如果是表单提交行为,则进行登录校验185 if request.method == \"POST\":186 login_form = UserForm(request.POST)187 message = \"请检查填写的内容!\"188 if login_form.is_valid():189 username = login_form.cleaned_data[\'username\']190 password = login_form.cleaned_data[\'password\']191 try:192 # 使用django提供的身份验证功能193 user = auth.authenticate(username=username, password=password) # 从auth_user表中匹配信息,匹配到则返回用户对象194 if user is not None:195 print(\"用户【%s】登录成功\" % username)196 auth.login(request, user)197 request.session[\'is_login\'] = True198 # 登录成功,跳转主页199 return redirect(\'/\')200 else:201 message = \"用户名不存在或者密码不正确!\"202 except:203 traceback.print_exc()204 message = \"登录程序出现异常\"205 # 用户名或密码为空,返回登录页和错误提示信息206 else:207 return render(request, \'login.html\', locals())208 # 不是表单提交,代表只是访问登录页209 else:210 login_form = UserForm()211 return render(request, \'login.html\', locals())212 213 214 # 注册页的视图函数215 def register(request):216 return render(request, \'register.html\')217 218 219 # 登出的视图函数:重定向至login视图函数220 @login_required221 def logout(request):222 auth.logout(request)223 request.session.flush()224 return redirect(\"/login/\")
9. 用例执行结果展示
9.1 定义模型类
1)models.py 中增加 TestCaseExecuteResult 模型类,用于记录用例执行结果。
1 from django.db import models 2 from smart_selects.db_fields import GroupedForeignKey # pip install django-smart-selects:后台级联选择 3 from django.contrib.auth.models import User 4 5 6 class Project(models.Model): 7 id = models.AutoField(primary_key=True) 8 name = models.CharField(\'项目名称\', max_length=50, unique=True, null=False) 9 proj_owner = models.CharField(\'项目负责人\', max_length=20, null=False) 10 test_owner = models.CharField(\'测试负责人\', max_length=20, null=False) 11 dev_owner = models.CharField(\'开发负责人\', max_length=20, null=False) 12 desc = models.CharField(\'项目描述\', max_length=100, null=True) 13 create_time = models.DateTimeField(\'项目创建时间\', auto_now_add=True) 14 update_time = models.DateTimeField(\'项目更新时间\', auto_now=True, null=True) 15 16 def __str__(self): 17 return self.name 18 19 class Meta: 20 verbose_name = \'项目信息表\' 21 verbose_name_plural = \'项目信息表\' 22 23 24 class Module(models.Model): 25 id = models.AutoField(primary_key=True) 26 name = models.CharField(\'模块名称\', max_length=50, null=False) 27 belong_project = models.ForeignKey(Project, on_delete=models.CASCADE) 28 test_owner = models.CharField(\'测试负责人\', max_length=50, null=False) 29 desc = models.CharField(\'简要描述\', max_length=100, null=True) 30 create_time = models.DateTimeField(\'创建时间\', auto_now_add=True) 31 update_time = models.DateTimeField(\'更新时间\', auto_now=True, null=True) 32 33 def __str__(self): 34 return self.name 35 36 class Meta: 37 verbose_name = \'模块信息表\' 38 verbose_name_plural = \'模块信息表\' 39 40 41 class TestCase(models.Model): 42 id = models.AutoField(primary_key=True) 43 case_name = models.CharField(\'用例名称\', max_length=50, null=False) # 如 register 44 belong_project = models.ForeignKey(Project, on_delete=models.CASCADE, verbose_name=\'所属项目\') 45 belong_module = GroupedForeignKey(Module, \"belong_project\", on_delete=models.CASCADE, verbose_name=\'所属模块\') 46 request_data = models.CharField(\'请求数据\', max_length=1024, null=False, default=\'\') 47 uri = models.CharField(\'接口地址\', max_length=1024, null=False, default=\'\') 48 assert_key = models.CharField(\'断言内容\', max_length=1024, null=True) 49 maintainer = models.CharField(\'编写人员\', max_length=1024, null=False, default=\'\') 50 extract_var = models.CharField(\'提取变量表达式\', max_length=1024, null=True) # 示例:userid||userid\": (\\d+) 51 request_method = models.CharField(\'请求方式\', max_length=1024, null=True) 52 status = models.IntegerField(null=True, help_text=\"0:表示有效,1:表示无效,用于软删除\") 53 created_time = models.DateTimeField(\'创建时间\', auto_now_add=True) 54 updated_time = models.DateTimeField(\'更新时间\', auto_now=True, null=True) 55 user = models.ForeignKey(User, on_delete=models.CASCADE, verbose_name=\'责任人\', null=True) 56 57 def __str__(self): 58 return self.case_name 59 60 class Meta: 61 verbose_name = \'测试用例表\' 62 verbose_name_plural = \'测试用例表\' 63 64 65 class CaseSuite(models.Model): 66 id = models.AutoField(primary_key=True) 67 suite_desc = models.CharField(\'用例集合描述\', max_length=100, blank=True, null=True) 68 if_execute = models.IntegerField(verbose_name=\'是否执行\', null=False, default=0, help_text=\'0:执行;1:不执行\') 69 test_case_model = models.CharField(\'测试执行模式\', max_length=100, blank=True, null=True, help_text=\'data/keyword\') 70 creator = models.CharField(max_length=50, blank=True, null=True) 71 create_time = models.DateTimeField(\'创建时间\', auto_now=True) # 创建时间-自动获取当前时间 72 73 class Meta: 74 verbose_name = \"用例集合表\" 75 verbose_name_plural = \'用例集合表\' 76 77 78 class SuiteCase(models.Model): 79 id = models.AutoField(primary_key=True) 80 case_suite = models.ForeignKey(CaseSuite, on_delete=models.CASCADE, verbose_name=\'用例集合\') 81 test_case = models.ForeignKey(TestCase, on_delete=models.CASCADE, verbose_name=\'测试用例\') 82 status = models.IntegerField(verbose_name=\'是否有效\', null=False, default=1, help_text=\'0:有效,1:无效\') 83 create_time = models.DateTimeField(\'创建时间\', auto_now=True) # 创建时间-自动获取当前时间 84 85 86 class InterfaceServer(models.Model): 87 id = models.AutoField(primary_key=True) 88 env = models.CharField(\'环境\', max_length=50, null=False, default=\'\') 89 ip = models.CharField(\'ip\', max_length=50, null=False, default=\'\') 90 port = models.CharField(\'端口\', max_length=100, null=False, default=\'\') 91 remark = models.CharField(\'备注\', max_length=100, null=True) 92 create_time = models.DateTimeField(\'创建时间\', auto_now_add=True) 93 update_time = models.DateTimeField(\'更新时间\', auto_now=True, null=True) 94 95 def __str__(self): 96 return self.env 97 98 class Meta: 99 verbose_name = \'接口地址配置表\'100 verbose_name_plural = \'接口地址配置表\'101 102 103 class TestCaseExecuteResult(models.Model):104 id = models.AutoField(primary_key=True)105 belong_test_case = GroupedForeignKey(TestCase, \"belong_test_case\", on_delete=models.CASCADE, verbose_name=\'所属用例\')106 status = models.IntegerField(null=True, help_text=\"0:表示未执行,1:表示已执行\")107 exception_info = models.CharField(max_length=2048, blank=True, null=True)108 request_data = models.CharField(\'请求体\', max_length=1024, null=True) # {\"code\": \"00\", \"userid\": 22889}109 response_data = models.CharField(\'响应字符串\', max_length=1024, null=True) # {\"code\": \"00\", \"userid\": 22889}110 execute_result = models.CharField(\'执行结果\', max_length=1024, null=True) # 成功/失败111 extract_var = models.CharField(\'关联参数\', max_length=1024, null=True) # 响应成功后提取变量112 last_time_response_data = models.CharField(\'上一次响应字符串\', max_length=1024, null=True) # {\"code\": \"00\", \"userid\": 22889}113 execute_total_time = models.CharField(\'执行耗时\', max_length=1024, null=True)114 execute_start_time = models.CharField(\'执行开始时间\', max_length=300, blank=True, null=True)115 execute_end_time = models.CharField(\'执行结束时间\', max_length=300, blank=True, null=True)116 created_time = models.DateTimeField(\'创建时间\', auto_now_add=True)117 updated_time = models.DateTimeField(\'更新时间\', auto_now=True, null=True)118 119 def __str__(self):120 return str(self.id)121 122 class Meta:123 verbose_name = \'用例执行结果记录表\'124 verbose_name_plural = \'用例执行结果记录表\'
2)数据迁移
python manage.py makemigrationspython manage.py migrate
9.2 修改用例执行封装函数,增加执行结果记录
修改应用目录下 task.py:
1 import time 2 import os 3 import traceback 4 import json 5 from . import models 6 from .utils.data_process import data_preprocess, assert_result, data_postprocess 7 from .utils.request_process import request_process 8 9 10 def case_task(test_case_id_list, server_address): 11 global_key = \'case\'+ str(int(time.time() * 100000)) 12 os.environ[global_key] = \'{}\' 13 print() 14 print(\"全局变量标识符【global_key】: {}\".format(global_key)) 15 print(\"全局变量内容【os.environ[global_key]】: {}\".format(os.environ[global_key])) 16 for test_case_id in test_case_id_list: 17 18 test_case = models.TestCase.objects.filter(id=int(test_case_id))[0] 19 last_execute_record_data = models.TestCaseExecuteResult.objects.filter( 20 belong_test_case_id=test_case_id).order_by(\'-id\') 21 if last_execute_record_data: 22 last_time_execute_response_data = last_execute_record_data[0].response_data 23 else: 24 last_time_execute_response_data = \'\' 25 print(\"上一次响应结果: {}\".format(last_execute_record_data)) 26 print(\"上一次响应时间: {}\".format(last_time_execute_response_data)) 27 execute_record = models.TestCaseExecuteResult.objects.create(belong_test_case=test_case) 28 execute_record.last_time_response_data = last_time_execute_response_data 29 # 获取当前用例上一次执行结果 30 execute_record.save() 31 32 test_case = models.TestCase.objects.filter(id=int(test_case_id))[0] 33 print(\"\\n######### 开始执行用例【{}】 #########\".format(test_case)) 34 execute_start_time = time.time() # 记录时间戳,便于计算总耗时(毫秒) 35 execute_record.execute_start_time = time.strftime(\"%Y-%m-%d %H:%M:%S\", time.localtime(execute_start_time)) 36 37 request_data = test_case.request_data 38 extract_var = test_case.extract_var 39 assert_key = test_case.assert_key 40 interface_name = test_case.uri 41 belong_project = test_case.belong_project 42 belong_module = test_case.belong_module 43 maintainer = test_case.maintainer 44 request_method = test_case.request_method 45 print(\"初始请求数据: {}\".format(request_data)) 46 print(\"关联参数: {}\".format(extract_var)) 47 print(\"断言关键字: {}\".format(assert_key)) 48 print(\"接口名称: {}\".format(interface_name)) 49 print(\"所属项目: {}\".format(belong_project)) 50 print(\"所属模块: {}\".format(belong_module)) 51 print(\"用例维护人: {}\".format(maintainer)) 52 print(\"请求方法: {}\".format(request_method)) 53 url = \"{}{}\".format(server_address, interface_name) 54 print(\"接口地址: {}\".format(url)) 55 code, request_data, error_msg = data_preprocess(global_key, str(request_data)) 56 # 请求数据预处理异常,结束用例执行 57 if code != 0: 58 print(\"数据处理异常,error: {}\".format(error_msg)) 59 execute_record.execute_result = \"失败\" 60 execute_record.status = 1 61 execute_record.exception_info = error_msg 62 execute_end_time = time.time() 63 execute_record.execute_end_time = time.strftime(\"%Y-%m-%d %H:%M:%S\", time.localtime(execute_end_time)) 64 execute_record.execute_total_time = int(execute_end_time - execute_start_time) * 1000 65 execute_record.save() 66 return 67 # 记录请求预处理结果 68 else: 69 execute_record.request_data = request_data 70 # 调用接口 71 try: 72 res_data = request_process(url, request_method, json.loads(request_data)) 73 print(\"响应数据: {}\".format(json.dumps(res_data.json(), ensure_ascii=False))) # ensure_ascii:兼容中文 74 result_flag, exception_info = assert_result(res_data, assert_key) 75 # 结果记录保存 76 if result_flag: 77 print(\"用例【%s】执行成功!\" % test_case) 78 execute_record.execute_result = \"成功\" 79 if extract_var.strip() != \"None\": 80 var_value = data_postprocess(global_key, json.dumps(res_data.json(), ensure_ascii=False), extract_var) 81 execute_record.extract_var = var_value 82 else: 83 print(\"用例【%s】执行失败!\" % test_case) 84 execute_record.execute_result = \"失败\" 85 execute_record.exception_info = exception_info 86 execute_record.response_data = json.dumps(res_data.json(), ensure_ascii=False) 87 execute_record.status = 1 88 execute_end_time = time.time() 89 execute_record.execute_end_time = time.strftime(\"%Y-%m-%d %H:%M:%S\", time.localtime(execute_end_time)) 90 print(\"执行结果结束时间: {}\".format(execute_record.execute_end_time)) 91 execute_record.execute_total_time = int((execute_end_time - execute_start_time) * 1000) 92 print(\"用例执行耗时: {}\".format(execute_record.execute_total_time)) 93 execute_record.save() 94 except Exception as e: 95 print(\"接口请求异常,error: {}\".format(traceback.format_exc())) 96 execute_record.execute_result = \"失败\" 97 execute_record.exception_info = traceback.format_exc() 98 execute_record.status = 1 99 execute_end_time = time.time()100 execute_record.execute_end_time = time.strftime(\"%Y-%m-%d %H:%M:%S\", time.localtime(execute_end_time))101 print(\"执行结果结束时间: {}\".format(execute_record.execute_end_time))102 execute_record.execute_total_time = int(execute_end_time - execute_start_time) * 1000103 print(\"用例执行耗时: {} 毫秒\".format(execute_record.execute_total_time))104 execute_record.save()
前端执行测试用例,查看用例执行结果表数据:
9.3 定义路由
在前面已经获取到用例结果数据并保存,下面处理一下用例结果展示。
from django.urls import path, re_pathfrom . import viewsurlpatterns = [ path(\'\', views.index), path(\'login/\', views.login), path(\'logout/\', views.logout), path(\'project/\', views.project, name=\'project\'), path(\'module/\', views.module, name=\'module\'), path(\'test_case/\', views.test_case, name=\"test_case\"), re_path(\'test_case_detail/(?P[0-9]+)\', views.test_case_detail, name=\"test_case_detail\"), re_path(\'module_test_cases/(?P[0-9]+)/$\', views.module_test_cases, name=\"module_test_cases\"), path(\'case_suite/\', views.case_suite, name=\"case_suite\"), re_path(\'add_case_in_suite/(?P[0-9]+)\', views.add_case_in_suite, name=\"add_case_in_suite\"), re_path(\'show_and_delete_case_in_suite/(?P[0-9]+)\', views.show_and_delete_case_in_suite, name=\"show_and_delete_case_in_suite\"), path(\'test_case_execute_record/\', views.test_case_execute_record, name=\"test_case_execute_record\"),]
9.4 定义视图函数
1 from django.shortcuts import render, redirect, HttpResponse 2 from django.contrib import auth # Django用户认证(Auth)组件一般用在用户的登录注册上,用于判断当前的用户是否合法 3 from django.contrib.auth.decorators import login_required 4 from django.core.paginator import Paginator, PageNotAnInteger, InvalidPage 5 from .form import UserForm 6 import traceback 7 from .models import Project, Module, TestCase, CaseSuite, SuiteCase, InterfaceServer, TestCaseExecuteResult 8 from .task import case_task 9 10 11 # 封装分页处理 12 def get_paginator(request, data): 13 paginator = Paginator(data, 10) # 默认每页展示10条数据 14 # 获取 url 后面的 page 参数的值, 首页不显示 page 参数, 默认值是 1 15 page = request.GET.get(\'page\') 16 try: 17 paginator_pages = paginator.page(page) 18 except PageNotAnInteger: 19 # 如果请求的页数不是整数, 返回第一页。 20 paginator_pages = paginator.page(1) 21 except InvalidPage: 22 # 如果请求的页数不存在, 重定向页面 23 return HttpResponse(\'找不到页面的内容\') 24 return paginator_pages 25 26 27 # 项目菜单项 28 @login_required 29 def project(request): 30 print(\"request.user.is_authenticated: \", request.user.is_authenticated) 31 projects = Project.objects.filter().order_by(\'-id\') 32 print(\"projects:\", projects) 33 return render(request, \'project.html\', {\'projects\': get_paginator(request, projects)}) 34 35 36 # 模块菜单项 37 @login_required 38 def module(request): 39 if request.method == \"GET\": # 请求get时候,id倒序查询所有的模块数据 40 modules = Module.objects.filter().order_by(\'-id\') 41 return render(request, \'module.html\', {\'modules\': get_paginator(request, modules)}) 42 else: # 否则就是Post请求,会根据输入内容,使用模糊的方式查找所有的项目 43 proj_name = request.POST[\'proj_name\'] 44 projects = Project.objects.filter(name__contains=proj_name.strip()) 45 projs = [proj.id for proj in projects] 46 modules = Module.objects.filter(belong_project__in=projs) # 把项目中所有的模块都找出来 47 return render(request, \'module.html\', {\'modules\': get_paginator(request, modules), \'proj_name\': proj_name}) 48 49 50 # 获取测试用例执行的接口地址 51 def get_server_address(env): 52 if env: # 环境处理 53 env_data = InterfaceServer.objects.filter(env=env[0]) 54 print(\"env_data: {}\".format(env_data)) 55 if env_data: 56 ip = env_data[0].ip 57 port = env_data[0].port 58 print(\"ip: {}, port: {}\".format(ip, port)) 59 server_address = \"http://{}:{}\".format(ip, port) 60 print(\"server_address: {}\".format(server_address)) 61 return server_address 62 else: 63 return \"\" 64 else: 65 return \"\" 66 67 68 # 测试用例菜单项 69 @login_required 70 def test_case(request): 71 print(\"request.session[\'is_login\']: {}\".format(request.session[\'is_login\'])) 72 test_cases = \"\" 73 if request.method == \"GET\": 74 test_cases = TestCase.objects.filter().order_by(\'id\') 75 print(\"testcases: {}\".format(test_cases)) 76 elif request.method == \"POST\": 77 print(\"request.POST: {}\".format(request.POST)) 78 test_case_id_list = request.POST.getlist(\'test_cases_list\') 79 env = request.POST.getlist(\'env\') 80 print(\"env: {}\".format(env)) 81 server_address = get_server_address(env) 82 if not server_address: 83 return HttpResponse(\"提交的运行环境为空,请选择环境后再提交!\") 84 if test_case_id_list: 85 test_case_id_list.sort() 86 print(\"test_case_id_list: {}\".format(test_case_id_list)) 87 print(\"获取到用例,开始用例执行\") 88 case_task(test_case_id_list, server_address) 89 else: 90 print(\"运行测试用例失败\") 91 return HttpResponse(\"提交的运行测试用例为空,请选择用例后在提交!\") 92 test_cases = TestCase.objects.filter().order_by(\'id\') 93 return render(request, \'test_case.html\', {\'test_cases\': get_paginator(request, test_cases)}) 94 95 96 # 用例详情页 97 @login_required 98 def test_case_detail(request, test_case_id): 99 test_case_id = int(test_case_id)100 test_case = TestCase.objects.get(id=test_case_id)101 print(\"test_case: {}\".format(test_case))102 print(\"test_case.id: {}\".format(test_case.id))103 print(\"test_case.belong_project: {}\".format(test_case.belong_project))104 105 return render(request, \'test_case_detail.html\', {\'test_case\': test_case})106 107 108 # 模块页展示测试用例109 @login_required110 def module_test_cases(request, module_id):111 module = \"\"112 if module_id: # 访问的时候,会从url中提取模块的id,根据模块id查询到模块数据,在模板中展现113 module = Module.objects.get(id=int(module_id))114 test_cases = TestCase.objects.filter(belong_module=module).order_by(\'-id\')115 print(\"test_case in module_test_cases: {}\".format(test_cases))116 return render(request, \'test_case.html\', {\'test_cases\': get_paginator(request, test_cases)})117 118 119 # 用例集合菜单项120 @login_required121 def case_suite(request):122 case_suites = CaseSuite.objects.filter()123 return render(request, \'case_suite.html\', {\'case_suites\': get_paginator(request, case_suites)})124 125 126 # 用例集合-添加测试用例页127 @login_required128 def add_case_in_suite(request, suite_id):129 # 查询指定的用例集合130 case_suite = CaseSuite.objects.get(id=suite_id)131 # 根据id号查询所有的用例132 test_cases = TestCase.objects.filter().order_by(\'id\')133 if request.method == \"GET\":134 print(\"test cases:\", test_cases)135 elif request.method == \"POST\":136 test_cases_list = request.POST.getlist(\'testcases_list\')137 # 如果页面勾选了用例138 if test_cases_list:139 print(\"勾选用例id:\", test_cases_list)140 # 根据页面勾选的用例与查询出的所有用例一一比较141 for test_case in test_cases_list:142 test_case = TestCase.objects.get(id=int(test_case))143 # 匹配成功则添加用例144 SuiteCase.objects.create(case_suite=case_suite, test_case=test_case)145 # 未勾选用例146 else:147 print(\"添加测试用例失败\")148 return HttpResponse(\"添加的测试用例为空,请选择用例后再添加!\")149 return render(request, \'add_case_in_suite.html\',150 {\'test_cases\': get_paginator(request, test_cases), \'case_suite\': case_suite})151 152 153 # 用例集合页-查看/删除用例154 @login_required155 def show_and_delete_case_in_suite(request, suite_id):156 case_suite = CaseSuite.objects.get(id=suite_id)157 test_cases = SuiteCase.objects.filter(case_suite=case_suite)158 if request.method == \"POST\":159 test_cases_list = request.POST.getlist(\'test_cases_list\')160 if test_cases_list:161 print(\"勾选用例:\", test_cases_list)162 for test_case in test_cases_list:163 test_case = TestCase.objects.get(id=int(test_case))164 SuiteCase.objects.filter(case_suite=case_suite, test_case=test_case).first().delete()165 else:166 print(\"测试用例删除失败\")167 return HttpResponse(\"所选测试用例为空,请选择用例后再进行删除!\")168 case_suite = CaseSuite.objects.get(id=suite_id)169 return render(request, \'show_and_delete_case_in_suite.html\',170 {\'test_cases\': get_paginator(request, test_cases), \'case_suite\': case_suite})171 172 173 @login_required174 def test_case_execute_record(request):175 test_case_execute_records = TestCaseExecuteResult.objects.filter().order_by(\'-id\')176 return render(request, \'test_case_execute_records.html\', {\'test_case_execute_records\': get_paginator(request, test_case_execute_records)})177 178 179 # 默认页的视图函数180 @login_required181 def index(request):182 return render(request, \'index.html\')183 184 185 # 登录页的视图函数186 def login(request):187 print(\"request.session.items(): {}\".format(request.session.items())) # 打印session信息188 if request.session.get(\'is_login\', None):189 return redirect(\'/\')190 # 如果是表单提交行为,则进行登录校验191 if request.method == \"POST\":192 login_form = UserForm(request.POST)193 message = \"请检查填写的内容!\"194 if login_form.is_valid():195 username = login_form.cleaned_data[\'username\']196 password = login_form.cleaned_data[\'password\']197 try:198 # 使用django提供的身份验证功能199 user = auth.authenticate(username=username, password=password) # 从auth_user表中匹配信息,匹配到则返回用户对象200 if user is not None:201 print(\"用户【%s】登录成功\" % username)202 auth.login(request, user)203 request.session[\'is_login\'] = True204 # 登录成功,跳转主页205 return redirect(\'/\')206 else:207 message = \"用户名不存在或者密码不正确!\"208 except:209 traceback.print_exc()210 message = \"登录程序出现异常\"211 # 用户名或密码为空,返回登录页和错误提示信息212 else:213 return render(request, \'login.html\', locals())214 # 不是表单提交,代表只是访问登录页215 else:216 login_form = UserForm()217 return render(request, \'login.html\', locals())218 219 220 # 注册页的视图函数221 def register(request):222 return render(request, \'register.html\')223 224 225 # 登出的视图函数:重定向至login视图函数226 @login_required227 def logout(request):228 auth.logout(request)229 request.session.flush()230 return redirect(\"/login/\")
9.5 定义模板
1)新增”测试执行记录“模板文件:templates/test_case_execute_records.html
1 {% extends \'base.html\' %} 2 {% load static %} 3 {% block title %}用例执行记录{% endblock %} 4 {% block content %} 5 6770 {% endblock %}8 9
48 49 {# 实现分页标签的代码 #}50 {# 这里使用 bootstrap 渲染页面 #}5110 22 23 24 25 {% for testrecord in test_case_execute_records %}26id 11名称 12请求数据 13执行返回结果 14操作 15断言内容 16执行结果 17异常信息 18请求后提取变量 19开始时间 20执行耗时(ms) 2127 44 {% endfor %}45 46 47{{ testrecord.id }} 28{{ testrecord.belong_test_case.case_name }} 29{{ testrecord.request_data }} 30{{ testrecord.response_data }} 31对比差异 32{{ testrecord.belong_test_case.assert_key }} 33{{ testrecord.execute_result|default_if_none:\"\" }} 34 {% if testrecord.exception_info %}35显示异常信息 36 {% else %}37无 38 {% endif %}39 40{{ testrecord.extract_var }} 41{{ testrecord.execute_start_time }} 42{{ testrecord.execute_total_time }} 4352 6869
2)修改 base.html:新增“用例执行结果”菜单项
1 2 3 {% load static %} 4 5 6 7 8 9{% block title %}base{% endblock %} 10 11 12 13 14 15 16 17 21 {% block css %}{% endblock %}22 23 24 58 59 {% block content %}{% endblock %}60 61 62 63 64 65 66 67
页面效果如下:
9.6 结果对比差异
在用例执行结果页面,可以看到在“操作”列,有“对比差异”链接,该功能用于对比当前用例上一次的执行结果与当前的执行结果,便于查看结果的差异。
由于在前面用例执行时,已经在结果记录环节获取到当前用例上一次的结果并记录到当前用例记录数据中,下面来处理一下这个页面的展示。
1) 定义路由
from django.urls import path, re_pathfrom . import viewsurlpatterns = [ path(\'\', views.index), path(\'login/\', views.login), path(\'logout/\', views.logout), path(\'project/\', views.project, name=\'project\'), path(\'module/\', views.module, name=\'module\'), path(\'test_case/\', views.test_case, name=\"test_case\"), re_path(\'test_case_detail/(?P[0-9]+)\', views.test_case_detail, name=\"test_case_detail\"), re_path(\'module_test_cases/(?P[0-9]+)/$\', views.module_test_cases, name=\"module_test_cases\"), path(\'case_suite/\', views.case_suite, name=\"case_suite\"), re_path(\'add_case_in_suite/(?P[0-9]+)\', views.add_case_in_suite, name=\"add_case_in_suite\"), re_path(\'show_and_delete_case_in_suite/(?P[0-9]+)\', views.show_and_delete_case_in_suite, name=\"show_and_delete_case_in_suite\"), path(\'test_case_execute_record/\', views.test_case_execute_record, name=\"test_case_execute_record\"), re_path(\'case_result_diff/(?P[0-9]+)\', views.case_result_diff, name=\"case_result_diff\"),]
2)定义视图函数
1 from django.shortcuts import render, redirect, HttpResponse 2 from django.contrib import auth # Django用户认证(Auth)组件一般用在用户的登录注册上,用于判断当前的用户是否合法 3 from django.contrib.auth.decorators import login_required 4 from django.core.paginator import Paginator, PageNotAnInteger, InvalidPage 5 from .form import UserForm 6 import traceback 7 import json 8 from .models import Project, Module, TestCase, CaseSuite, SuiteCase, InterfaceServer, TestCaseExecuteResult 9 from .task import case_task 10 11 12 # 封装分页处理 13 def get_paginator(request, data): 14 paginator = Paginator(data, 10) # 默认每页展示10条数据 15 # 获取 url 后面的 page 参数的值, 首页不显示 page 参数, 默认值是 1 16 page = request.GET.get(\'page\') 17 try: 18 paginator_pages = paginator.page(page) 19 except PageNotAnInteger: 20 # 如果请求的页数不是整数, 返回第一页。 21 paginator_pages = paginator.page(1) 22 except InvalidPage: 23 # 如果请求的页数不存在, 重定向页面 24 return HttpResponse(\'找不到页面的内容\') 25 return paginator_pages 26 27 28 # 项目菜单项 29 @login_required 30 def project(request): 31 print(\"request.user.is_authenticated: \", request.user.is_authenticated) 32 projects = Project.objects.filter().order_by(\'-id\') 33 print(\"projects:\", projects) 34 return render(request, \'project.html\', {\'projects\': get_paginator(request, projects)}) 35 36 37 # 模块菜单项 38 @login_required 39 def module(request): 40 if request.method == \"GET\": # 请求get时候,id倒序查询所有的模块数据 41 modules = Module.objects.filter().order_by(\'-id\') 42 return render(request, \'module.html\', {\'modules\': get_paginator(request, modules)}) 43 else: # 否则就是Post请求,会根据输入内容,使用模糊的方式查找所有的项目 44 proj_name = request.POST[\'proj_name\'] 45 projects = Project.objects.filter(name__contains=proj_name.strip()) 46 projs = [proj.id for proj in projects] 47 modules = Module.objects.filter(belong_project__in=projs) # 把项目中所有的模块都找出来 48 return render(request, \'module.html\', {\'modules\': get_paginator(request, modules), \'proj_name\': proj_name}) 49 50 51 # 获取测试用例执行的接口地址 52 def get_server_address(env): 53 if env: # 环境处理 54 env_data = InterfaceServer.objects.filter(env=env[0]) 55 print(\"env_data: {}\".format(env_data)) 56 if env_data: 57 ip = env_data[0].ip 58 port = env_data[0].port 59 print(\"ip: {}, port: {}\".format(ip, port)) 60 server_address = \"http://{}:{}\".format(ip, port) 61 print(\"server_address: {}\".format(server_address)) 62 return server_address 63 else: 64 return \"\" 65 else: 66 return \"\" 67 68 69 # 测试用例菜单项 70 @login_required 71 def test_case(request): 72 print(\"request.session[\'is_login\']: {}\".format(request.session[\'is_login\'])) 73 test_cases = \"\" 74 if request.method == \"GET\": 75 test_cases = TestCase.objects.filter().order_by(\'id\') 76 print(\"testcases: {}\".format(test_cases)) 77 elif request.method == \"POST\": 78 print(\"request.POST: {}\".format(request.POST)) 79 test_case_id_list = request.POST.getlist(\'test_cases_list\') 80 env = request.POST.getlist(\'env\') 81 print(\"env: {}\".format(env)) 82 server_address = get_server_address(env) 83 if not server_address: 84 return HttpResponse(\"提交的运行环境为空,请选择环境后再提交!\") 85 if test_case_id_list: 86 test_case_id_list.sort() 87 print(\"test_case_id_list: {}\".format(test_case_id_list)) 88 print(\"获取到用例,开始用例执行\") 89 case_task(test_case_id_list, server_address) 90 else: 91 print(\"运行测试用例失败\") 92 return HttpResponse(\"提交的运行测试用例为空,请选择用例后在提交!\") 93 test_cases = TestCase.objects.filter().order_by(\'id\') 94 return render(request, \'test_case.html\', {\'test_cases\': get_paginator(request, test_cases)}) 95 96 97 # 用例详情页 98 @login_required 99 def test_case_detail(request, test_case_id):100 test_case_id = int(test_case_id)101 test_case = TestCase.objects.get(id=test_case_id)102 print(\"test_case: {}\".format(test_case))103 print(\"test_case.id: {}\".format(test_case.id))104 print(\"test_case.belong_project: {}\".format(test_case.belong_project))105 106 return render(request, \'test_case_detail.html\', {\'test_case\': test_case})107 108 109 # 模块页展示测试用例110 @login_required111 def module_test_cases(request, module_id):112 module = \"\"113 if module_id: # 访问的时候,会从url中提取模块的id,根据模块id查询到模块数据,在模板中展现114 module = Module.objects.get(id=int(module_id))115 test_cases = TestCase.objects.filter(belong_module=module).order_by(\'-id\')116 print(\"test_case in module_test_cases: {}\".format(test_cases))117 return render(request, \'test_case.html\', {\'test_cases\': get_paginator(request, test_cases)})118 119 120 # 用例集合菜单项121 @login_required122 def case_suite(request):123 case_suites = CaseSuite.objects.filter()124 return render(request, \'case_suite.html\', {\'case_suites\': get_paginator(request, case_suites)})125 126 127 # 用例集合-添加测试用例页128 @login_required129 def add_case_in_suite(request, suite_id):130 # 查询指定的用例集合131 case_suite = CaseSuite.objects.get(id=suite_id)132 # 根据id号查询所有的用例133 test_cases = TestCase.objects.filter().order_by(\'id\')134 if request.method == \"GET\":135 print(\"test cases:\", test_cases)136 elif request.method == \"POST\":137 test_cases_list = request.POST.getlist(\'testcases_list\')138 # 如果页面勾选了用例139 if test_cases_list:140 print(\"勾选用例id:\", test_cases_list)141 # 根据页面勾选的用例与查询出的所有用例一一比较142 for test_case in test_cases_list:143 test_case = TestCase.objects.get(id=int(test_case))144 # 匹配成功则添加用例145 SuiteCase.objects.create(case_suite=case_suite, test_case=test_case)146 # 未勾选用例147 else:148 print(\"添加测试用例失败\")149 return HttpResponse(\"添加的测试用例为空,请选择用例后再添加!\")150 return render(request, \'add_case_in_suite.html\',151 {\'test_cases\': get_paginator(request, test_cases), \'case_suite\': case_suite})152 153 154 # 用例集合页-查看/删除用例155 @login_required156 def show_and_delete_case_in_suite(request, suite_id):157 case_suite = CaseSuite.objects.get(id=suite_id)158 test_cases = SuiteCase.objects.filter(case_suite=case_suite)159 if request.method == \"POST\":160 test_cases_list = request.POST.getlist(\'test_cases_list\')161 if test_cases_list:162 print(\"勾选用例:\", test_cases_list)163 for test_case in test_cases_list:164 test_case = TestCase.objects.get(id=int(test_case))165 SuiteCase.objects.filter(case_suite=case_suite, test_case=test_case).first().delete()166 else:167 print(\"测试用例删除失败\")168 return HttpResponse(\"所选测试用例为空,请选择用例后再进行删除!\")169 case_suite = CaseSuite.objects.get(id=suite_id)170 return render(request, \'show_and_delete_case_in_suite.html\',171 {\'test_cases\': get_paginator(request, test_cases), \'case_suite\': case_suite})172 173 174 # 用例执行结果菜单项175 @login_required176 def test_case_execute_record(request):177 test_case_execute_records = TestCaseExecuteResult.objects.filter().order_by(\'-id\')178 return render(request, \'test_case_execute_records.html\', {\'test_case_execute_records\': get_paginator(request, test_case_execute_records)})179 180 181 # 用例执行结果-对比差异182 @login_required183 def diffCaseResponse(request, test_record_id):184 test_record_data = TestCaseExecuteResult.objects.get(id=test_record_id)185 print(\"用例执行结果记录: {}\".format(test_record_data))186 present_response = test_record_data.response_data187 if present_response:188 present_response = json.dumps(json.loads(present_response), sort_keys=True, indent=4,189 ensure_ascii=False) # 中文字符不转ascii编码190 print(\"当前响应结果: {}\".format(present_response))191 last_time_execute_response = test_record_data.last_time_response_data192 if last_time_execute_response:193 last_time_execute_response = json.dumps(json.loads(last_time_execute_response), sort_keys=True, indent=4,194 ensure_ascii=False)195 print(\"上一次响应结果: {}\".format(last_time_execute_response))196 return render(request, \'case_result_diff.html\', locals())197 198 199 # 默认页的视图函数200 @login_required201 def index(request):202 return render(request, \'index.html\')203 204 205 # 登录页的视图函数206 def login(request):207 print(\"request.session.items(): {}\".format(request.session.items())) # 打印session信息208 if request.session.get(\'is_login\', None):209 return redirect(\'/\')210 # 如果是表单提交行为,则进行登录校验211 if request.method == \"POST\":212 login_form = UserForm(request.POST)213 message = \"请检查填写的内容!\"214 if login_form.is_valid():215 username = login_form.cleaned_data[\'username\']216 password = login_form.cleaned_data[\'password\']217 try:218 # 使用django提供的身份验证功能219 user = auth.authenticate(username=username, password=password) # 从auth_user表中匹配信息,匹配到则返回用户对象220 if user is not None:221 print(\"用户【%s】登录成功\" % username)222 auth.login(request, user)223 request.session[\'is_login\'] = True224 # 登录成功,跳转主页225 return redirect(\'/\')226 else:227 message = \"用户名不存在或者密码不正确!\"228 except:229 traceback.print_exc()230 message = \"登录程序出现异常\"231 # 用户名或密码为空,返回登录页和错误提示信息232 else:233 return render(request, \'login.html\', locals())234 # 不是表单提交,代表只是访问登录页235 else:236 login_form = UserForm()237 return render(request, \'login.html\', locals())238 239 240 # 注册页的视图函数241 def register(request):242 return render(request, \'register.html\')243 244 245 # 登出的视图函数:重定向至login视图函数246 @login_required247 def logout(request):248 auth.logout(request)249 request.session.flush()250 return redirect(\"/login/\")
3)定义模板
新增 case_result_diff.html:
1 {% extends \'base.html\' %} 2 {% load static %} 3 {% block title %}结果对比差异{% endblock %} 4 5 {% block content %} 6
上次执行结果 | 10本次执行结果 | 11
---|---|
16 17
{{ last_time_execute_response | safe }} 18 19 |
21
{{ present_response | safe }} 22 |
26 27 {% endblock %}
修改 test_case_execute_records.html:增加“对比差异”链接
{% extends \'base.html\' %}{% load static %}{% block title %}用例执行记录{% endblock %}{% block content %}{% endblock %}{# 实现分页标签的代码 #} {# 这里使用 bootstrap 渲染页面 #}
{% for testrecord in test_case_execute_records %} id 名称 请求数据 执行返回结果 操作 断言内容 执行结果 异常信息 请求后提取变量 开始时间 执行耗时(ms) {% endfor %} {{ testrecord.id }} {{ testrecord.belong_test_case.case_name }} {{ testrecord.request_data }} {{ testrecord.response_data }} 对比差异 {{ testrecord.belong_test_case.assert_key }} {{ testrecord.execute_result|default_if_none:\"\" }} {% if testrecord.exception_info %}显示异常信息 {% else %}无 {% endif %}{{ testrecord.extract_var }} {{ testrecord.execute_start_time }} {{ testrecord.execute_total_time }}
9.7 异常信息展示
1)定义路由
from django.urls import path, re_pathfrom . import viewsurlpatterns = [ path(\'\', views.index), path(\'login/\', views.login), path(\'logout/\', views.logout), path(\'project/\', views.project, name=\'project\'), path(\'module/\', views.module, name=\'module\'), path(\'test_case/\', views.test_case, name=\"test_case\"), re_path(\'test_case_detail/(?P[0-9]+)\', views.test_case_detail, name=\"test_case_detail\"), re_path(\'module_test_cases/(?P[0-9]+)/$\', views.module_test_cases, name=\"module_test_cases\"), path(\'case_suite/\', views.case_suite, name=\"case_suite\"), re_path(\'add_case_in_suite/(?P[0-9]+)\', views.add_case_in_suite, name=\"add_case_in_suite\"), re_path(\'show_and_delete_case_in_suite/(?P[0-9]+)\', views.show_and_delete_case_in_suite, name=\"show_and_delete_case_in_suite\"), path(\'test_case_execute_record/\', views.test_case_execute_record, name=\"test_case_execute_record\"), re_path(\'case_result_diff/(?P[0-9]+)\', views.case_result_diff, name=\"case_result_diff\"), re_path(\'show_exception/(?P[0-9]+)$\', views.show_exception, name=\"show_exception\"),]
2)定义视图函数
1 from django.shortcuts import render, redirect, HttpResponse 2 from django.contrib import auth # Django用户认证(Auth)组件一般用在用户的登录注册上,用于判断当前的用户是否合法 3 from django.contrib.auth.decorators import login_required 4 from django.core.paginator import Paginator, PageNotAnInteger, InvalidPage 5 from .form import UserForm 6 import traceback 7 import json 8 from .models import Project, Module, TestCase, CaseSuite, SuiteCase, InterfaceServer, TestCaseExecuteResult 9 from .task import case_task 10 11 12 # 封装分页处理 13 def get_paginator(request, data): 14 paginator = Paginator(data, 10) # 默认每页展示10条数据 15 # 获取 url 后面的 page 参数的值, 首页不显示 page 参数, 默认值是 1 16 page = request.GET.get(\'page\') 17 try: 18 paginator_pages = paginator.page(page) 19 except PageNotAnInteger: 20 # 如果请求的页数不是整数, 返回第一页。 21 paginator_pages = paginator.page(1) 22 except InvalidPage: 23 # 如果请求的页数不存在, 重定向页面 24 return HttpResponse(\'找不到页面的内容\') 25 return paginator_pages 26 27 28 # 项目菜单项 29 @login_required 30 def project(request): 31 print(\"request.user.is_authenticated: \", request.user.is_authenticated) 32 projects = Project.objects.filter().order_by(\'-id\') 33 print(\"projects:\", projects) 34 return render(request, \'project.html\', {\'projects\': get_paginator(request, projects)}) 35 36 37 # 模块菜单项 38 @login_required 39 def module(request): 40 if request.method == \"GET\": # 请求get时候,id倒序查询所有的模块数据 41 modules = Module.objects.filter().order_by(\'-id\') 42 return render(request, \'module.html\', {\'modules\': get_paginator(request, modules)}) 43 else: # 否则就是Post请求,会根据输入内容,使用模糊的方式查找所有的项目 44 proj_name = request.POST[\'proj_name\'] 45 projects = Project.objects.filter(name__contains=proj_name.strip()) 46 projs = [proj.id for proj in projects] 47 modules = Module.objects.filter(belong_project__in=projs) # 把项目中所有的模块都找出来 48 return render(request, \'module.html\', {\'modules\': get_paginator(request, modules), \'proj_name\': proj_name}) 49 50 51 # 获取测试用例执行的接口地址 52 def get_server_address(env): 53 if env: # 环境处理 54 env_data = InterfaceServer.objects.filter(env=env[0]) 55 print(\"env_data: {}\".format(env_data)) 56 if env_data: 57 ip = env_data[0].ip 58 port = env_data[0].port 59 print(\"ip: {}, port: {}\".format(ip, port)) 60 server_address = \"http://{}:{}\".format(ip, port) 61 print(\"server_address: {}\".format(server_address)) 62 return server_address 63 else: 64 return \"\" 65 else: 66 return \"\" 67 68 69 # 测试用例菜单项 70 @login_required 71 def test_case(request): 72 print(\"request.session[\'is_login\']: {}\".format(request.session[\'is_login\'])) 73 test_cases = \"\" 74 if request.method == \"GET\": 75 test_cases = TestCase.objects.filter().order_by(\'id\') 76 print(\"testcases: {}\".format(test_cases)) 77 elif request.method == \"POST\": 78 print(\"request.POST: {}\".format(request.POST)) 79 test_case_id_list = request.POST.getlist(\'test_cases_list\') 80 env = request.POST.getlist(\'env\') 81 print(\"env: {}\".format(env)) 82 server_address = get_server_address(env) 83 if not server_address: 84 return HttpResponse(\"提交的运行环境为空,请选择环境后再提交!\") 85 if test_case_id_list: 86 test_case_id_list.sort() 87 print(\"test_case_id_list: {}\".format(test_case_id_list)) 88 print(\"获取到用例,开始用例执行\") 89 case_task(test_case_id_list, server_address) 90 else: 91 print(\"运行测试用例失败\") 92 return HttpResponse(\"提交的运行测试用例为空,请选择用例后在提交!\") 93 test_cases = TestCase.objects.filter().order_by(\'id\') 94 return render(request, \'test_case.html\', {\'test_cases\': get_paginator(request, test_cases)}) 95 96 97 # 用例详情页 98 @login_required 99 def test_case_detail(request, test_case_id):100 test_case_id = int(test_case_id)101 test_case = TestCase.objects.get(id=test_case_id)102 print(\"test_case: {}\".format(test_case))103 print(\"test_case.id: {}\".format(test_case.id))104 print(\"test_case.belong_project: {}\".format(test_case.belong_project))105 106 return render(request, \'test_case_detail.html\', {\'test_case\': test_case})107 108 109 # 模块页展示测试用例110 @login_required111 def module_test_cases(request, module_id):112 module = \"\"113 if module_id: # 访问的时候,会从url中提取模块的id,根据模块id查询到模块数据,在模板中展现114 module = Module.objects.get(id=int(module_id))115 test_cases = TestCase.objects.filter(belong_module=module).order_by(\'-id\')116 print(\"test_case in module_test_cases: {}\".format(test_cases))117 return render(request, \'test_case.html\', {\'test_cases\': get_paginator(request, test_cases)})118 119 120 # 用例集合菜单项121 @login_required122 def case_suite(request):123 case_suites = CaseSuite.objects.filter()124 return render(request, \'case_suite.html\', {\'case_suites\': get_paginator(request, case_suites)})125 126 127 # 用例集合-添加测试用例页128 @login_required129 def add_case_in_suite(request, suite_id):130 # 查询指定的用例集合131 case_suite = CaseSuite.objects.get(id=suite_id)132 # 根据id号查询所有的用例133 test_cases = TestCase.objects.filter().order_by(\'id\')134 if request.method == \"GET\":135 print(\"test cases:\", test_cases)136 elif request.method == \"POST\":137 test_cases_list = request.POST.getlist(\'testcases_list\')138 # 如果页面勾选了用例139 if test_cases_list:140 print(\"勾选用例id:\", test_cases_list)141 # 根据页面勾选的用例与查询出的所有用例一一比较142 for test_case in test_cases_list:143 test_case = TestCase.objects.get(id=int(test_case))144 # 匹配成功则添加用例145 SuiteCase.objects.create(case_suite=case_suite, test_case=test_case)146 # 未勾选用例147 else:148 print(\"添加测试用例失败\")149 return HttpResponse(\"添加的测试用例为空,请选择用例后再添加!\")150 return render(request, \'add_case_in_suite.html\',151 {\'test_cases\': get_paginator(request, test_cases), \'case_suite\': case_suite})152 153 154 # 用例集合页-查看/删除用例155 @login_required156 def show_and_delete_case_in_suite(request, suite_id):157 case_suite = CaseSuite.objects.get(id=suite_id)158 test_cases = SuiteCase.objects.filter(case_suite=case_suite)159 if request.method == \"POST\":160 test_cases_list = request.POST.getlist(\'test_cases_list\')161 if test_cases_list:162 print(\"勾选用例:\", test_cases_list)163 for test_case in test_cases_list:164 test_case = TestCase.objects.get(id=int(test_case))165 SuiteCase.objects.filter(case_suite=case_suite, test_case=test_case).first().delete()166 else:167 print(\"测试用例删除失败\")168 return HttpResponse(\"所选测试用例为空,请选择用例后再进行删除!\")169 case_suite = CaseSuite.objects.get(id=suite_id)170 return render(request, \'show_and_delete_case_in_suite.html\',171 {\'test_cases\': get_paginator(request, test_cases), \'case_suite\': case_suite})172 173 174 # 用例执行结果菜单项175 @login_required176 def test_case_execute_record(request):177 test_case_execute_records = TestCaseExecuteResult.objects.filter().order_by(\'-id\')178 return render(request, \'test_case_execute_records.html\', {\'test_case_execute_records\': get_paginator(request, test_case_execute_records)})179 180 181 # 用例执行结果-对比差异182 @login_required183 def case_result_diff(request, test_record_id):184 test_record_data = TestCaseExecuteResult.objects.get(id=test_record_id)185 print(\"用例执行结果记录: {}\".format(test_record_data))186 present_response = test_record_data.response_data187 if present_response:188 present_response = json.dumps(json.loads(present_response), sort_keys=True, indent=4,189 ensure_ascii=False) # 中文字符不转ascii编码190 print(\"当前响应结果: {}\".format(present_response))191 last_time_execute_response = test_record_data.last_time_response_data192 if last_time_execute_response:193 last_time_execute_response = json.dumps(json.loads(last_time_execute_response), sort_keys=True, indent=4,194 ensure_ascii=False)195 print(\"上一次响应结果: {}\".format(last_time_execute_response))196 return render(request, \'case_result_diff.html\', locals())197 198 199 # 用例执行结果-异常信息展示200 @login_required201 def show_exception(request, execute_id):202 test_record = TestCaseExecuteResult.objects.get(id=execute_id)203 return render(request, \'show_exception.html\', {\'exception_info\': test_record.exception_info})204 205 206 # 默认页的视图函数207 @login_required208 def index(request):209 return render(request, \'index.html\')210 211 212 # 登录页的视图函数213 def login(request):214 print(\"request.session.items(): {}\".format(request.session.items())) # 打印session信息215 if request.session.get(\'is_login\', None):216 return redirect(\'/\')217 # 如果是表单提交行为,则进行登录校验218 if request.method == \"POST\":219 login_form = UserForm(request.POST)220 message = \"请检查填写的内容!\"221 if login_form.is_valid():222 username = login_form.cleaned_data[\'username\']223 password = login_form.cleaned_data[\'password\']224 try:225 # 使用django提供的身份验证功能226 user = auth.authenticate(username=username, password=password) # 从auth_user表中匹配信息,匹配到则返回用户对象227 if user is not None:228 print(\"用户【%s】登录成功\" % username)229 auth.login(request, user)230 request.session[\'is_login\'] = True231 # 登录成功,跳转主页232 return redirect(\'/\')233 else:234 message = \"用户名不存在或者密码不正确!\"235 except:236 traceback.print_exc()237 message = \"登录程序出现异常\"238 # 用户名或密码为空,返回登录页和错误提示信息239 else:240 return render(request, \'login.html\', locals())241 # 不是表单提交,代表只是访问登录页242 else:243 login_form = UserForm()244 return render(request, \'login.html\', locals())245 246 247 # 注册页的视图函数248 def register(request):249 return render(request, \'register.html\')250 251 252 # 登出的视图函数:重定向至login视图函数253 @login_required254 def logout(request):255 auth.logout(request)256 request.session.flush()257 return redirect(\"/login/\")
3)定义模板
新增异常信息展示模板:show_exception.html
1 {% extends \'base.html\' %}2 {% load static %}3 {% block title %}异常信息{% endblock %}4 {% block content %}5 6异常信息如下:
7{{ exception_info|default_if_none:\"\" }}
8 9 {% endblock %}
修改用例执行记录模板 test_case_execute_records.html:增加异常信息展示链接
1 {% extends \'base.html\' %} 2 {% load static %} 3 {% block title %}用例执行记录{% endblock %} 4 {% block content %} 5 6776 {% endblock %}8 9
54 55 {# 实现分页标签的代码 #}56 {# 这里使用 bootstrap 渲染页面 #}5710 22 23 24 25 {% for testrecord in test_case_execute_records %}26id 11名称 12请求数据 13执行返回结果 14操作 15断言内容 16执行结果 17异常信息 18请求后提取变量 19开始时间 20执行耗时(ms) 2127 50 {% endfor %}51 52 53{{ testrecord.id }} 28{{ testrecord.belong_test_case.case_name }} 29{{ testrecord.request_data }} 30{{ testrecord.response_data }} 31对比差异 32{{ testrecord.belong_test_case.assert_key }} 33 34 {% ifequal testrecord.execute_result \'成功\' %}35{{ testrecord.execute_result}} 36 {% else %}37{{ testrecord.execute_result}} 38 {% endifequal %}39 40 {% if testrecord.exception_info %}41显示异常信息 42 {% else %}43无 44 {% endif %}45 46{{ testrecord.extract_var }} 47{{ testrecord.execute_start_time }} 48{{ testrecord.execute_total_time }} 4958 7475