111
作者:互联网
import logging from collections import namedtuple from pathlib import Path from typing import Dict, Iterable, Sequence, List from openpyxl import Workbook, load_workbook from openpyxl.styles import Font, Alignment, PatternFill, colors from openpyxl.utils import get_column_letter from lib import error logger = logging.getLogger(__name__) def init_log(): log_dir = Path('./log') if not log_dir.exists(): log_dir.mkdir(parents=True) logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s: %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler(filename=log_dir/'excel.log', encoding='utf8') ]) class Field: def __init__(self, name, required=True, action=None) -> None: self.name = name self.required = required self.action = action class RowTuple(tuple): """可以通过Field名字作为key访问的表格行Row """ def __new__(cls, fields: Sequence, values: Sequence): # print(fields) # print(values) row = super().__new__(cls, values) row.fields = tuple(fields) row._map = None return row # 一行数据的元祖 def __getitem__(self, field): # 在对象点语法获取它没有的属性和方法的时候自动触发 if isinstance(field, int): return super().__getitem__(field) if not self._map: self._map = dict(zip(self.fields, self)) # print(36, self._map) return self._map[field] def get(self, key, default=None): # print(40,self._map,key) value = self._map.get(key, default) return value if value else default # def load_excel(excel_file: str, ws_names: dict): # wb = load_workbook(excel_file, data_only=True) # values = {} # for ws_name in ws_names: # try: # extract_fields = ws_names[ws_name] # except KeyError: # raise error.Error(f'"{ws_name}"表格不存在。') # rows = wb[ws_name].values # fields = next(rows) # for name in extract_fields: # if name not in fields: # raise error.Error(f'表格"{ws_name}"中的"{name}"项不存在。') # data = [] # for row in rows: # if not any(row): # continue # rowdata = dict(zip(fields, row)) # data.append({key: rowdata[key] for key in extract_fields}) # values[ws_name] = data # return values def reset_col(file_path): """ 1.调整各个sheet的各列宽,最大长度不超过默认宽度,不做调整防止列宽过窄。超过默认宽度则:列宽为最大长度*1.2,且最大为60 2.对单元格设置样式,比如: 1)超链接:1.设置蓝色斜体,加下划线 2.其长度不参与列宽设置 2)表头居中,其他左对齐 Args: file_path: 文件路径 """ wb = load_workbook(file_path) for sheet in wb.sheetnames: # 获取某个sheet对象 ws = wb[sheet] # 遍历各列根据最长的一个cell的长度设置一列的长度 for col in ws.columns: # col = (<Cell 'Sheet'.A1>, <Cell 'Sheet'.A2>, <Cell 'Sheet'.A3>) # print(col,ws.columns) index = list(ws.columns).index(col) # 列序号 1、2、... letter = get_column_letter(index + 1) # 列字母 A、B、C、... original_width = ws.column_dimensions[letter].width # 获取默认列宽 max_length = 0 # 获取这一列长度的最大值 当然也可以用: min获取最小值 mean获取平均值 for index,cell in enumerate(col): try: # # 第一步:表头水平居中,其他左对齐,所有单元格垂直居中自动换行 # if index == 0: # cell.alignment = Alignment(horizontal='center', vertical='center',wrapText= True) # else: # cell.alignment = Alignment(horizontal="left", vertical='center',wrapText= True) # 第一步:左对齐,所有单元格垂直居中自动换行 cell.alignment = Alignment(horizontal="left", vertical='center', wrapText=True) # 第二步:如果是文件图片的超链接:1.设置蓝色字体,加下划线 2.其长度不参与列宽设置 if str(cell.value).startswith('=HYPERLINK'): # cell.style = "Hyperlink" # 会变成蓝色字体但是没有下划线 cell.font = Font(underline='single',italic=True, color='0066FF') # 指定颜色带下划线 continue # 第三步:查找列最长value的长度 if len(str(cell.value)) > max_length: max_length = len(str(cell.value)) except BaseException as e: print(e) pass preset_width = max_length* 1.2 if max_length* 1.2 < 60 else 60 # 最大长度不超过默认宽度,不做调整防止列宽过窄。超过默认宽度则:列宽为最大长度*1.2,且最大为40 adjusted_width = preset_width if max_length > original_width else original_width ws.column_dimensions[letter].width = adjusted_width wb.save(file_path) return True def save_excel(file_path, headers_iter: Iterable, rows_iter: Iterable): """ 生成空白表格,并在默认的sheet中写入数据 Args: excel_file:文件路径,如:C:\\Users\\13154\\Desktop\\666\\094.xlsx field_names:headers的Iterable,如(1,2,3) rows:行的Iterable,为空则传None,如 [(11,None,13),(21,'',23)],如果要插入文件链接,单元格的字符按照以下的格式: 1)绝对路径 换电脑后失效,不靠谱 '=HYPERLINK("{}","{}")'.format(r'file:///C:\222.png','莫名弹窗.jpg') 2)相对路径,这个靠谱 '=HYPERLINK("{}","{}")'.format(r'./\222.png','莫名弹窗.jpg') #表示该excel并列的一张图片 '=HYPERLINK("{}","{}")'.format(r'./files\222.png','莫名弹窗.jpg') # 表示该excel并列的files文件夹中的一张图片 """ # 创建一个工作簿 wb = Workbook() # 获取当前表单对象,即默认生成的sheet ws = wb.active # 生成表头 ws.append(headers_iter) # 生成各行数据 for row in rows_iter: ws.append(row) excelpath = Path(file_path) excelpath.resolve() # 绝对化路径,并将路径转换为合适的格式,例如windows下将斜杠转化为反斜杠 wb.save(excelpath) reset_col(file_path) return True def save_new_sheet(excel_file, sheets_dic): """打开excel新增sheet :param excel_file: :param sheetname: :param data: :return: """ # logger.info(f'save_new_sheet: sheets_dic:{sheets_dic}') # 打开文件 workbook = load_workbook(excel_file, data_only=True) mark_flag_lists = sheets_dic.pop('mark_flag_lists') # 遍历数据,生成多个sheet for sheetname, data in sheets_dic.items(): # 删除同名的工作表sheet if sheetname in workbook.sheetnames: workbook.remove(workbook[sheetname]) # 创建新的sheet ws = workbook.create_sheet(title=sheetname) # # 生成表头 # ws.append(data[0]) # 生成各行数据 for row_index,row in enumerate(data): ws.append(row) # 对于Miss_math这个sheet,错误的地方需要红色标记出来 if sheetname=='Miss_math' and row_index!=0: for column_index in range(len(row)): if mark_flag_lists[row_index-1][column_index] == 1: cell_a = ws.cell(row_index+1, column_index+1) # excel行数 1-n cell_a.fill = PatternFill("solid", fgColor="FF6633") cell_a.font = Font(color=colors.BLACK, bold=True) excelpath = Path(excel_file) excelpath.resolve() # 绝对化路径,并将路径转换为合适的格式,例如windows下将斜杠转化为反斜杠 workbook.save(excelpath) reset_col(excel_file) def load_excel(excel_file: str, primary_key_lis: list, sheet_names: Iterable = None) -> List[Dict[str, Iterable] or List[str, Iterable]]: """加载excel文件中指定sheet中的数据 每个sheet里第一行必须是表格的header。 Args: excel_file: excel文件的路径 primary_key_lis: 二元列表,列表值为列的索引0-n,每一行这两个列的数据会拼接成一个primary_key用来标记每一行的数据 sheet_names: 要读取的sheet名称。默认是None,表示全部读取 Returns: 返回一个[dict, list] dict: key是sheet的名字,value1也是一个dict,value2是namedtuple的Iterable,。如{'Sheet1': {'key':(2.1, 2.2), 'key2':(3.1, 3.2), 'key3':(4.1, 4.2)},} list: 列表项是Iterable,primary_key有重复的行只记录一次,其他的 sheet_name+每行数据 放进这里面 rows_num_list: 两个sheet的数据行数,如[100, 99] """ logger.info(f'load_excel: primary_key_lis:{primary_key_lis}, sheet_names:{sheet_names}') workbook = load_workbook(excel_file, data_only=True) if not sheet_names: sheet_names = workbook.sheetnames result = {} duplicate_rows_list = [] rows_num_list = [] for ws_name in sheet_names: logger.info(f'load_excel: start load {ws_name}') if ws_name not in workbook.sheetnames: logger.exception(f'表格"{ws_name}"不存在。') raise error.Error(f'表格"{ws_name}"不存在。') # 选取一个sheet表单 sheet = workbook[ws_name] # 获取第一行表头 header = [c.value for c in sheet[1]] # result[ws_name] = [ # RowTuple(header, values) # for values in sheet.iter_rows(min_row=2, values_only=True) # ] rows_dic = {} rows = 0 for values in sheet.iter_rows(min_row=2, values_only=True): rows += 1 # 某两列的数据拼接成唯一primary_key作为对照的id rows_dic_key = f'{str(values[primary_key_lis[0]])}_{str(values[primary_key_lis[1]])}' # print(rows_dic_key) # 出现相同数据则数据有异常 或 primary_key选取不当 if rows_dic_key in rows_dic.keys(): logger.warning(f'find duplicate_rows :{rows_dic_key} value:{values}') lis = list(values) lis.insert(0, ws_name) duplicate_rows_list.append(lis) continue rows_dic[rows_dic_key] = RowTuple(header, values) rows_num_list.append(rows) # 额外添加表头字段进去,便于生成表头 rows_dic['headers_lis'] = header result[ws_name] = rows_dic return [result, duplicate_rows_list, rows_num_list] def compare_excels(excel_file: str, primary_key_lis: list, sheet1: str, sheet2: str, ignore_columns_list: Sequence=() ): """根据两个列组合primary_key,对两个excel表中两个sheet数据一行行的对比, 找出不同 :param excel_file: 文件路径 :param primary_key_lis: 组合primary_key的两个列的索引,如[0, 1]会将第一列和第二列拼接为primary_key :param sheet1 & sheet2: 要对比的两个工作表sheet名子 :param ignore_columns_list: 要忽略的列的索引,0-n :return: 会在该excel中生成sheet1_only、sheet2_only、mismatch、duplicate 4个sheet """ # print(f'compare_excels: primary_key_lis:{primary_key_lis}, sheet1:{sheet1}, sheet2:{sheet2}, ignore_columns_list:{ignore_columns_list}') logger.info(f'compare_excels: primary_key_lis:{primary_key_lis}, sheet1:{sheet1}, sheet2:{sheet2}, ignore_columns_list:{ignore_columns_list}') res_list = load_excel(excel_file= excel_file, primary_key_lis=primary_key_lis, sheet_names=[sheet1, sheet2]) sheet1_dic = res_list[0][sheet1] sheet2_dic = res_list[0][sheet2] duplicate_rows_list = res_list[1] sheet1_num, sheet2_num = res_list[2] sheet1_only_list = [sheet1_dic['headers_lis'],] sheet2_only_list = [sheet2_dic['headers_lis'],] lis = ['source', ] lis.extend(sheet1_dic['headers_lis']) miss_math_list = [lis,] # 找出两个表不共同拥有的行, 一定是对不上的,分别存放 sheet1_or_sheet2_ids = set(sheet1_dic.keys()) ^ set(sheet2_dic.keys()) for id in sheet1_or_sheet2_ids: if id in set(sheet1_dic.keys()): sheet1_only_list.append(sheet1_dic.get(id)) else: sheet2_only_list.append(sheet2_dic.get(id)) # 找出共有的行,但数据不相同的 sheet1_and_sheet2_ids = set(sheet1_dic.keys()) & set(sheet2_dic.keys()) match_num = 0 miss_math_num = 0 mark_flag_lists = [] # 不匹配的数据标记出来 # 第一步:遍历行 for id in sheet1_and_sheet2_ids: row_sheet1 = sheet1_dic[id] row_sheet2 = sheet2_dic[id] mark_flag_list = [] # 第二步:遍历每一行所有列 for i in range(len(row_sheet1)): if i in ignore_columns_list: # logger.warning(f'这是第{i}列数据,可以忽略') mark_flag_list.append(0) continue if row_sheet1[i] != row_sheet2[i]: mark_flag_list.append(1) miss_math_num += 1 else: mark_flag_list.append(0) # 只要一个标志位是1,则mismath if 1 in mark_flag_list: lis1 = [sheet1, ] lis1.extend(list(row_sheet1)) miss_math_list.append(lis1) lis2 = [sheet2, ] lis2.extend(list(row_sheet2)) miss_math_list.append(lis2) # 第一列source不需要标记 mark_flag_list.insert(0, 0) # 两个表的不同,需要添加两行 mark_flag_lists.append(mark_flag_list) mark_flag_lists.append(mark_flag_list) else: match_num += 1 sheets_dict= { f'{sheet1}_only': sheet1_only_list, f'{sheet2}_only': sheet2_only_list, f'Miss_math': miss_math_list, f'Duplicate': duplicate_rows_list, 'mark_flag_lists': mark_flag_lists, # [[0,0,1], [1,0,0], ...] } save_new_sheet(excel_file, sheets_dic=sheets_dict) match_num -= 1 # 表头 duplicate_rows = len(duplicate_rows_list) sheet1_only = len(sheet1_only_list) - 1 sheet2_only = len(sheet2_only_list) - 1 logger.info(f'{sheet1}:{sheet1_num}, {sheet2}:{sheet2_num}, match_num:{match_num}, miss_math_num:{miss_math_num},duplicate_rows:{duplicate_rows}, {sheet1}_only:{sheet1_only}, {sheet2}_only:{sheet2_only}')
标签:rows,sheet,list,sheet1,key,sheet2,111 来源: https://www.cnblogs.com/xp1315458571/p/16310151.html