本文共 12236 字,大约阅读时间需要 40 分钟。
一、 接口 自动化布局
零、数据库
import osimport configparserimport pymysql# ========Reading db_config.ini setting ========# base_dir = os.path.dirname(os.path.abspath(__file__))# print(base_dir)# base_dir = base_dir.replace("\\", "/")# file_path = base_dir + "/db_config.ini"# print(file_path)# # cf = cparser.ConfigParser()# cf = configparser.ConfigParser()# cf.read(file_path)cf = configparser.ConfigParser()cf.read('../db_fixture/db_config.ini', encoding='utf-8')host = cf.get("mysqlconf", "host")port = cf.get("mysqlconf", "port")db = cf.get("mysqlconf", "db_name")user = cf.get("mysqlconf", "user")password = cf.get("mysqlconf", "password")# print("host=", host,"port=",port,"db=",db,"user=",user,"passwprd=",password)# ======== Mysql base operating ========class DB: def __init__(self): try: # Connect to the database self.connection = pymysql.Connect(host=str(host), port=int(port), user=str(user), password=str(password), db=str(db), charset="utf8mb4") except pymysql.err.OperationalError as e: print("Mysql Error %d: %s" % (e.args[0], e.args[1])) # select count(data) from table def select_count(self, column_name, table_name, condition): real_sql = "select {} from {} where {};".format(column_name, table_name, condition) print(real_sql) with self.connection.cursor() as cursor: count = cursor.execute(real_sql) # print('there has %s rows record' %count) result = cursor.fetchall() #select_result = [] for i in result: print(i) #m = select_result.append(i) #print('select_result',m) cursor.close() return count # clear table data def clear(self, table_name, condition): # real_sql = "truncate table" + table_name + ";" # real_sql = "delete from " + table_name + " where "+condition+";" real_sql = "delete from {} where {};".format(table_name, condition) print(real_sql) with self.connection.cursor() as cursor: cursor.execute(real_sql) print("The data --- %s--- has been deleted!" %condition) self.connection.commit() # insert sql statement def insert(self, table_name, key, value): # for key in table_data: # table_data[key] = " '" + str(table_data[key]) + "'" # key = ','.join(table_data.keys()) # value = ','.join(table_data.values()) # real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value) # print(real_sql) # with self.connection.cursor() as cursor: # cursor.execute(real_sql) # for key, value in table_data.items(): # real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value) # print(real_sql) # with self.connection.cursor() as cursor: # cursor.execute(real_sql) real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value) with self.connection.cursor() as cursor: cursor.execute(real_sql) self.connection.commit() # close database def close(self): self.connection.close() # init data def init_data(self, table_name, key, value): # for table, data in datas.items(): # #self.clear(table) # for d in data: # self.insert(table, d) # self.close() real_sql = 'INSERT INTO {} ({}) VALUES ({}) '.format(table_name, key, value) print(real_sql) with self.connection.cursor() as cursor: cursor.execute(real_sql) self.connection.commit() # select data from table def select_data(self, column_name, table_name, condition): # real_sql = "select *" + from + table_name + where + condition +";" # real_sql = "select " + column_name + " from " + table_name + " where " + condition + ";" real_sql = "select {} from {} where {} ;".format(column_name,table_name,condition) print(real_sql) with self.connection.cursor() as cursor: count = cursor.execute(real_sql) print('there has %s rows record' % count) result = cursor.fetchall() select_data=[] for i in result: select_data.append(i) print(select_data) cursor.close() return list(select_data)if __name__ == '__main__': holiday_name = '元宵4' datas = {'start_time': '2018-01-03', 'end_time': '2018-01-03', 'holiday_name': holiday_name, 'holiday_type': '4', 'main_holiday_id': '175'} table_name = 'tt_ts_forecast_holiday' key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id' value = "'4','2018-01-03','2018-01-03','元宵4','175'" DB().init_data(table_name, key, value) # 重置查询数据
一、增加
import os,sysimport unittestfrom common import methodfrom config import configfrom db_fixture import mysql_db#登录主页login_url = config.login_urllogin_data = config.login_datalogin = method.login(login_url, login_data=login_data)class AddHolidaies (unittest.TestCase): """添加主假日""" def setUp(self): self.base_url = config.t3_url def tearDown(self): print(self.result) def test_addMainHolidaies(self): jsondata = method.fetch_jsondata_from_file('../file/t3') # 1.一般情况 holiday_name = '元宵4' jsondata['holidayName'] = holiday_name t3_column_name = '*' t3_data_base = 'tt_ts_forecast_holiday' t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name) method.database_de_duplication(t3_column_name, t3_data_base, t3_condition) # 数据库去重 self.result = method.postdata(self.base_url, jsondata) # 发送请求 self.assertEqual(0, self.result['code']) self.assertEqual(1, self.result['data']) self.assertEqual('SUCCESS', self.result['message']) mysql_db.DB().clear(t3_data_base, t3_condition) # 清除刚添加的数据 # 2.日期大于20天 new_req = method.modify_value(jsondata, startTime='2018-04-01', endTime='2018-04-22') self.result = method.postdata(self.base_url, new_req) self.assertEqual(2001103, self.result['code']) self.assertEqual(0, self.result['data']) message = 'The number of days between the start and end dates cannot be greater than 20' self.assertEqual(message, self.result['message'])if __name__ == "__main__": # test_data.init_data() # 初始化接口测试数据 unittest.main() # 运行测试集
二、查
import unittestfrom config import configfrom db_fixture import mysql_dbfrom common import method# 登录login_url = config.login_urllogin_data = config.login_datalogin = method.login(login_url, login_data=login_data)class ListHolidaies(unittest.TestCase): def setUp(self): self.t4_url = config.t4_url def tearDown(self): print(self.result) def test_listHolidaies(self): holiday_name = '元宵4' t3_column_name = '*' t3_data_base = 'tt_ts_forecast_holiday' t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name) method.database_de_duplication(t3_column_name, t3_data_base, t3_condition) # 去重 key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete' value = "'4','2018-01-03','2018-01-03','元宵4','175', '2019-03-29 22:49:11', '2019-03-29 22:49:11', '0'" mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据 self.result = method.postdata(self.t4_url, method.fetch_jsondata_from_file('../file/t4')) print(self.result) # 提取查询结果 ,对比返回结果 self.assertEqual(0, self.result['code']) self.assertEqual('SUCCESS', self.result['message']) self.assertIn('元宵4', str(self.result['data'])) method.database_de_duplication(t3_column_name, t3_data_base, t3_condition)if __name__ == '__main__': unittest.main()
3.删除
import os,sysimport unittestfrom db_fixture import mysql_dbfrom config import configfrom common import methodimport copyparentpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))sys.path.insert(0, parentpath)# 登录主页login_url = config.login_urllogin_data = config.login_datalogin = method.login(login_url, login_data=login_data)class deleteHoliday_test(unittest.TestCase): def setUp(self): self.base_url = config.t5_url def tearDown(self): print(self.result) def test_deleteHoliday(self): """去重""" holiday_name = '元宵4' t3_column_name = '*' t3_data_base = 'tt_ts_forecast_holiday' t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name) main_holiday_id = '175' method.database_de_duplication(t3_column_name, t3_data_base, t3_condition) # 去重 """重置查询数据""" key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete' value = "'4','2018-01-03','2018-01-03','元宵4',{}, '2019-03-29 22:49:11', " \ "'2019-03-29 22:49:11', '0'".format(main_holiday_id) mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据 """设置测试数据""" id_condition = 'holiday_name = "元宵4" AND is_delete = 0 order by id DESC' id_select = mysql_db.DB().select_data('id', t3_data_base, id_condition) id_select = id_select[0][0] json_data = method.fetch_jsondata_from_file('../file/t5') json_data["id"] = int(id_select) json_data["mainHolidayId"] = int(main_holiday_id) self.result = method.postdata(config.t5_url, json_data) self.assertEqual(0, self.result['code']) self.assertEqual(1, self.result['data']) self.assertEqual('SUCCESS', self.result['message'])if __name__ == "__main__": # test_data.init_data() # 初始化接口测试数据 unittest.main() # 运行测试集
4.改
import unittestfrom config import configfrom common import methodfrom db_fixture import mysql_dbimport osimport sysparentpath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))sys.path.insert(0, parentpath)# 登录主页login_url = config.login_urllogin_data = config.login_datalogin = method.login(login_url, login_data=login_data)class updateHoliday_test(unittest.TestCase): def SetUp(self): self.t6_url = config.t6_url def TearDown(self): print(self.result) def test_updateHoliday(self): """去重""" holiday_name = '元宵4' t3_column_name = '*' t3_data_base = 'tt_ts_forecast_holiday' t3_condition = 'holiday_name = "{}" and is_delete=0 '.format(holiday_name) main_holiday_id = '175' method.database_de_duplication(t3_column_name, t3_data_base, t3_condition) # 去重 """重置数据""" key = 'holiday_type, start_time, end_time, holiday_name, main_holiday_id, update_time, create_time, is_delete' value = "'4','2018-01-03','2018-01-03','元宵4',{}, '2019-03-29 22:49:11', " \ "'2019-03-29 22:49:11', '0'".format(main_holiday_id) mysql_db.DB().init_data(t3_data_base, key, value) # 重置查询数据 select_id_data = mysql_db.DB().select_data('id', t3_data_base, t3_condition) # 获取id """设置请求数据""" json_data = method.fetch_jsondata_from_file('../file/t6') select_id = select_id_data[0][0] json_data['id'] = int(select_id) self.result = method.postdata(self.t6_url, json_data) coloum = 'start_time' condition = 'id = {}'.format(select_id) select_data = mysql_db.DB().select_data(coloum, t3_data_base, condition) print(select_data) self.assertEqual(self.result['code'], 0) self.assertEqual(self.result['data'], 1) self.assertEqual(self.result['message'], 'SUCCESS')if __name__ == '__main__': unittest.main()
转载地址:http://fmmwb.baihongyu.com/