博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
【接口自动化】
阅读量:2157 次
发布时间:2019-05-01

本文共 12236 字,大约阅读时间需要 40 分钟。

一、 接口 自动化布局

 

零、数据库

import osimport configparserimport pymysql# ========Reading db_config.ini setting ========# base_dir = os.path.dirname(os.path.abspath(__file__))# print(base_dir)# base_dir = base_dir.replace("\\", "/")# file_path = base_dir + "/db_config.ini"# print(file_path)# # cf = cparser.ConfigParser()# cf = configparser.ConfigParser()# cf.read(file_path)cf = configparser.ConfigParser()cf.read('../db_fixture/db_config.ini', encoding='utf-8')host = cf.get("mysqlconf", "host")port = cf.get("mysqlconf", "port")db = cf.get("mysqlconf", "db_name")user = cf.get("mysqlconf", "user")password = cf.get("mysqlconf", "password")# print("host=", host,"port=",port,"db=",db,"user=",user,"passwprd=",password)# ======== Mysql base operating ========class DB:    def __init__(self):        try:            # Connect to the database            self.connection = pymysql.Connect(host=str(host),                                              port=int(port),                                              user=str(user),                                              password=str(password),                                              db=str(db),                                              charset="utf8mb4")        except pymysql.err.OperationalError as e:            print("Mysql Error %d: %s" % (e.args[0], e.args[1]))    # select count(data) from table    def select_count(self, column_name, table_name, condition):        real_sql = "select {} from {} where {};".format(column_name, table_name, condition)        print(real_sql)        with self.connection.cursor() as cursor:            count = cursor.execute(real_sql)            # print('there has %s rows record' %count)            result = cursor.fetchall()            #select_result = []            for i in result:                print(i)                #m = select_result.append(i)                #print('select_result',m)            cursor.close()            return count    # clear table data    def clear(self, table_name, condition):        # real_sql = "truncate table" + table_name + ";"        # real_sql = "delete from " + table_name + " where "+condition+";"        real_sql = "delete from {} where {};".format(table_name, condition)        print(real_sql)        with self.connection.cursor() as cursor:            cursor.execute(real_sql)            print("The data --- %s--- has been deleted!" %condition)        self.connection.commit()    # insert sql statement    def insert(self, table_name, key, value):        # for key in table_data:        #     table_data[key] = " '" + str(table_data[key]) + "'"        # key = ','.join(table_data.keys())        # value = ','.join(table_data.values())        # real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)        # print(real_sql)        # with self.connection.cursor() as cursor:        #     cursor.execute(real_sql)        # for key, value in table_data.items():        #     real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)        #     print(real_sql)        #     with self.connection.cursor() as cursor:        #         cursor.execute(real_sql)        real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)        with self.connection.cursor() as cursor:            cursor.execute(real_sql)        self.connection.commit()    # close database    def close(self):        self.connection.close()    # init data    def init_data(self, table_name, key, value):        # for table, data in datas.items():        #     #self.clear(table)        #     for d in data:        #         self.insert(table, d)        # self.close()        real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value)        print(real_sql)        with self.connection.cursor() as cursor:            cursor.execute(real_sql)            self.connection.commit()    # select data from table    def select_data(self, column_name, table_name, condition):        # real_sql = "select  *" + from + table_name + where + condition +";"        # real_sql = "select " + column_name + " from " + table_name + " where " + condition + ";"        real_sql = "select {}  from {} where {} ;".format(column_name,table_name,condition)        print(real_sql)        with self.connection.cursor() as cursor:            count = cursor.execute(real_sql)            print('there has %s rows record' % count)            result = cursor.fetchall()            select_data=[]            for i in result:                select_data.append(i)            print(select_data)            cursor.close()            return list(select_data)if __name__ == '__main__':    holiday_name = '元宵4'    datas = {'start_time': '2018-01-03', 'end_time': '2018-01-03', 'holiday_name': holiday_name,             'holiday_type': '4', 'main_holiday_id': '175'}    table_name = 'tt_ts_forecast_holiday'    key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id'    value = "'4','2018-01-03','2018-01-03','元宵4','175'"    DB().init_data(table_name, key, value)  # 重置查询数据

一、增加

import os,sysimport unittestfrom common import methodfrom config import configfrom db_fixture import mysql_db#登录主页login_url = config.login_urllogin_data = config.login_datalogin = method.login(login_url, login_data=login_data)class AddHolidaies (unittest.TestCase):    """添加主假日"""    def setUp(self):        self.base_url = config.t3_url    def tearDown(self):        print(self.result)    def test_addMainHolidaies(self):        jsondata = method.fetch_jsondata_from_file('../file/t3')        # 1.一般情况        holiday_name = '元宵4'        jsondata['holidayName'] = holiday_name        t3_column_name = '*'        t3_data_base = 'tt_ts_forecast_holiday'        t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)        method.database_de_duplication(t3_column_name,                                       t3_data_base,                                       t3_condition)  # 数据库去重        self.result = method.postdata(self.base_url, jsondata)  # 发送请求        self.assertEqual(0, self.result['code'])        self.assertEqual(1, self.result['data'])        self.assertEqual('SUCCESS', self.result['message'])        mysql_db.DB().clear(t3_data_base, t3_condition)  # 清除刚添加的数据        # 2.日期大于20天        new_req = method.modify_value(jsondata, startTime='2018-04-01', endTime='2018-04-22')        self.result = method.postdata(self.base_url, new_req)        self.assertEqual(2001103, self.result['code'])        self.assertEqual(0, self.result['data'])        message = 'The number of days between the start and end dates cannot be greater than 20'        self.assertEqual(message, self.result['message'])if __name__ == "__main__":    # test_data.init_data()  # 初始化接口测试数据    unittest.main()  # 运行测试集

二、查

import unittestfrom config import configfrom db_fixture import mysql_dbfrom common import method#  登录login_url = config.login_urllogin_data = config.login_datalogin = method.login(login_url, login_data=login_data)class ListHolidaies(unittest.TestCase):    def setUp(self):        self.t4_url = config.t4_url    def tearDown(self):        print(self.result)    def test_listHolidaies(self):        holiday_name = '元宵4'        t3_column_name = '*'        t3_data_base = 'tt_ts_forecast_holiday'        t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)        method.database_de_duplication(t3_column_name,                                       t3_data_base,                                       t3_condition)  # 去重        key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'        value = "'4','2018-01-03','2018-01-03','元宵4','175', '2019-03-29 22:49:11', '2019-03-29 22:49:11', '0'"        mysql_db.DB().init_data(t3_data_base, key, value)  # 重置查询数据        self.result = method.postdata(self.t4_url, method.fetch_jsondata_from_file('../file/t4'))        print(self.result)        # 提取查询结果 ,对比返回结果        self.assertEqual(0, self.result['code'])        self.assertEqual('SUCCESS', self.result['message'])        self.assertIn('元宵4', str(self.result['data']))        method.database_de_duplication(t3_column_name,                                       t3_data_base,                                       t3_condition)if __name__ == '__main__':    unittest.main()

3.删除

import os,sysimport unittestfrom db_fixture import mysql_dbfrom config import configfrom common import methodimport copyparentpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))sys.path.insert(0, parentpath)# 登录主页login_url = config.login_urllogin_data = config.login_datalogin = method.login(login_url, login_data=login_data)class deleteHoliday_test(unittest.TestCase):    def setUp(self):        self.base_url = config.t5_url    def tearDown(self):        print(self.result)    def test_deleteHoliday(self):        """去重"""        holiday_name = '元宵4'        t3_column_name = '*'        t3_data_base = 'tt_ts_forecast_holiday'        t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)        main_holiday_id = '175'        method.database_de_duplication(t3_column_name,                                       t3_data_base,                                       t3_condition)  # 去重        """重置查询数据"""        key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'        value = "'4','2018-01-03','2018-01-03','元宵4',{}, '2019-03-29 22:49:11', " \                "'2019-03-29 22:49:11', '0'".format(main_holiday_id)        mysql_db.DB().init_data(t3_data_base, key, value)  # 重置查询数据        """设置测试数据"""        id_condition = 'holiday_name = "元宵4" AND is_delete = 0 order by id DESC'        id_select = mysql_db.DB().select_data('id', t3_data_base, id_condition)        id_select = id_select[0][0]        json_data = method.fetch_jsondata_from_file('../file/t5')        json_data["id"] = int(id_select)        json_data["mainHolidayId"] = int(main_holiday_id)        self.result = method.postdata(config.t5_url, json_data)        self.assertEqual(0, self.result['code'])        self.assertEqual(1, self.result['data'])        self.assertEqual('SUCCESS', self.result['message'])if __name__ == "__main__":    # test_data.init_data()  # 初始化接口测试数据    unittest.main()  # 运行测试集

4.改

import unittestfrom config import  configfrom common import methodfrom db_fixture import mysql_dbimport osimport sysparentpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))sys.path.insert(0, parentpath)# 登录主页login_url = config.login_urllogin_data = config.login_datalogin = method.login(login_url, login_data=login_data)class updateHoliday_test(unittest.TestCase):    def SetUp(self):        self.t6_url = config.t6_url    def TearDown(self):        print(self.result)    def test_updateHoliday(self):        """去重"""        holiday_name = '元宵4'        t3_column_name = '*'        t3_data_base = 'tt_ts_forecast_holiday'        t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name)        main_holiday_id = '175'        method.database_de_duplication(t3_column_name,                                       t3_data_base,                                       t3_condition)  # 去重        """重置数据"""        key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete'        value = "'4','2018-01-03','2018-01-03','元宵4',{}, '2019-03-29 22:49:11', " \                "'2019-03-29 22:49:11', '0'".format(main_holiday_id)        mysql_db.DB().init_data(t3_data_base, key, value)  # 重置查询数据        select_id_data = mysql_db.DB().select_data('id', t3_data_base, t3_condition)  # 获取id        """设置请求数据"""        json_data = method.fetch_jsondata_from_file('../file/t6')        select_id = select_id_data[0][0]        json_data['id'] = int(select_id)        self.result = method.postdata(self.t6_url, json_data)        coloum = 'start_time'        condition = 'id = {}'.format(select_id)        select_data = mysql_db.DB().select_data(coloum, t3_data_base, condition)        print(select_data)        self.assertEqual(self.result['code'], 0)        self.assertEqual(self.result['data'], 1)        self.assertEqual(self.result['message'], 'SUCCESS')if __name__ == '__main__':    unittest.main()

 

转载地址:http://fmmwb.baihongyu.com/

你可能感兴趣的文章
程序员--学习之路--技巧
查看>>
解决问题之 MySQL慢查询日志设置
查看>>
contOS6 部署 lnmp、FTP、composer、ThinkPHP5、docker详细步骤
查看>>
TP5.1模板布局中遇到的坑,配置完不生效解决办法
查看>>
PHPstudy中遇到的坑No input file specified,以及传到linux环境下遇到的坑,模板文件不存在
查看>>
TP5.1事务操作和TP5事务回滚操作多表
查看>>
composer install或composer update 或 composer require phpoffice/phpexcel 失败解决办法
查看>>
TP5.1项目从windows的Apache服务迁移到linux的Nginx服务需要注意几点。
查看>>
win10安装软件 打开时报错 找不到 msvcp120.dll
查看>>
PHPunit+Xdebug代码覆盖率以及遇到的问题汇总
查看>>
PHPUnit安装及使用
查看>>
PHP项目用xhprof性能分析(安装及应用实例)
查看>>
composer安装YII
查看>>
Sublime text3快捷键演示
查看>>
sublime text3 快捷键修改
查看>>
关于PHP几点建议
查看>>
硬盘的接口、协议
查看>>
VLAN与子网划分区别
查看>>
Cisco Packet Tracer教程
查看>>
02. 交换机的基本配置和管理
查看>>