其他分享
首页 > 其他分享> > 111

111

作者:互联网

import logging
from collections import namedtuple
from pathlib import Path
from typing import Dict, Iterable, Sequence, List

from openpyxl import Workbook, load_workbook
from openpyxl.styles import Font, Alignment, PatternFill, colors
from openpyxl.utils import get_column_letter

from lib import error

logger = logging.getLogger(__name__)
def init_log():
    log_dir = Path('./log')
    if not log_dir.exists():
        log_dir.mkdir(parents=True)
    logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(name)s: %(message)s', handlers=[
        logging.StreamHandler(),
        logging.FileHandler(filename=log_dir/'excel.log', encoding='utf8')
    ])

class Field:
    def __init__(self, name, required=True, action=None) -> None:
        self.name = name
        self.required = required
        self.action = action


class RowTuple(tuple):
    """可以通过Field名字作为key访问的表格行Row
    """
    def __new__(cls, fields: Sequence, values: Sequence):
        # print(fields)
        # print(values)
        row = super().__new__(cls, values)
        row.fields = tuple(fields)
        row._map = None
        return row # 一行数据的元祖

    def __getitem__(self, field): # 在对象点语法获取它没有的属性和方法的时候自动触发
        if isinstance(field, int):
            return super().__getitem__(field)
        if not self._map:
            self._map = dict(zip(self.fields, self))
        # print(36, self._map)
        return self._map[field]

    def get(self, key, default=None):
        # print(40,self._map,key)
        value = self._map.get(key, default)
        return value if value else default


# def load_excel(excel_file: str, ws_names: dict):
#     wb = load_workbook(excel_file, data_only=True)
#     values = {}
#     for ws_name in ws_names:
#         try:
#             extract_fields = ws_names[ws_name]
#         except KeyError:
#             raise error.Error(f'"{ws_name}"表格不存在。')
#         rows = wb[ws_name].values
#         fields = next(rows)
#         for name in extract_fields:
#             if name not in fields:
#                 raise error.Error(f'表格"{ws_name}"中的"{name}"项不存在。')
#         data = []
#         for row in rows:
#             if not any(row):
#                 continue
#             rowdata = dict(zip(fields, row))
#             data.append({key: rowdata[key] for key in extract_fields})
#         values[ws_name] = data
#     return values

def reset_col(file_path):
    """
    1.调整各个sheet的各列宽,最大长度不超过默认宽度,不做调整防止列宽过窄。超过默认宽度则:列宽为最大长度*1.2,且最大为60
    2.对单元格设置样式,比如:
        1)超链接:1.设置蓝色斜体,加下划线 2.其长度不参与列宽设置
        2)表头居中,其他左对齐
    Args:
        file_path: 文件路径
    """
    wb = load_workbook(file_path)
    for sheet in wb.sheetnames:
        # 获取某个sheet对象
        ws = wb[sheet]

        # 遍历各列根据最长的一个cell的长度设置一列的长度
        for col in ws.columns:
            # col = (<Cell 'Sheet'.A1>, <Cell 'Sheet'.A2>, <Cell 'Sheet'.A3>)
            # print(col,ws.columns)
            index = list(ws.columns).index(col)  # 列序号 1、2、...
            letter = get_column_letter(index + 1)  # 列字母 A、B、C、...
            original_width = ws.column_dimensions[letter].width # 获取默认列宽
            max_length = 0
            # 获取这一列长度的最大值 当然也可以用: min获取最小值 mean获取平均值
            for index,cell in enumerate(col):
                try:
                    # # 第一步:表头水平居中,其他左对齐,所有单元格垂直居中自动换行
                    # if index == 0:
                    #     cell.alignment = Alignment(horizontal='center', vertical='center',wrapText= True)
                    # else:
                    #     cell.alignment = Alignment(horizontal="left", vertical='center',wrapText= True)

                    # 第一步:左对齐,所有单元格垂直居中自动换行
                    cell.alignment = Alignment(horizontal="left", vertical='center', wrapText=True)

                    # 第二步:如果是文件图片的超链接:1.设置蓝色字体,加下划线 2.其长度不参与列宽设置
                    if str(cell.value).startswith('=HYPERLINK'):
                        # cell.style = "Hyperlink" # 会变成蓝色字体但是没有下划线
                        cell.font = Font(underline='single',italic=True, color='0066FF') # 指定颜色带下划线
                        continue
                    # 第三步:查找列最长value的长度
                    if len(str(cell.value)) > max_length:
                        max_length = len(str(cell.value))
                except BaseException as e:
                    print(e)
                    pass
            preset_width = max_length* 1.2 if max_length* 1.2 < 60 else 60
            # 最大长度不超过默认宽度,不做调整防止列宽过窄。超过默认宽度则:列宽为最大长度*1.2,且最大为40
            adjusted_width = preset_width if max_length > original_width else  original_width
            ws.column_dimensions[letter].width = adjusted_width
    wb.save(file_path)
    return True

def save_excel(file_path, headers_iter: Iterable, rows_iter: Iterable):
    """
    生成空白表格,并在默认的sheet中写入数据
    Args:
        excel_file:文件路径,如:C:\\Users\\13154\\Desktop\\666\\094.xlsx
        field_names:headers的Iterable,如(1,2,3)
        rows:行的Iterable,为空则传None,如 [(11,None,13),(21,'',23)],如果要插入文件链接,单元格的字符按照以下的格式:
            1)绝对路径 换电脑后失效,不靠谱
                '=HYPERLINK("{}","{}")'.format(r'file:///C:\222.png','莫名弹窗.jpg')
            2)相对路径,这个靠谱
                '=HYPERLINK("{}","{}")'.format(r'./\222.png','莫名弹窗.jpg') #表示该excel并列的一张图片
                '=HYPERLINK("{}","{}")'.format(r'./files\222.png','莫名弹窗.jpg') # 表示该excel并列的files文件夹中的一张图片
    """
    # 创建一个工作簿
    wb = Workbook()
    # 获取当前表单对象,即默认生成的sheet
    ws = wb.active
    # 生成表头
    ws.append(headers_iter)
    # 生成各行数据
    for row in rows_iter:
        ws.append(row)
    excelpath = Path(file_path)
    excelpath.resolve() # 绝对化路径,并将路径转换为合适的格式,例如windows下将斜杠转化为反斜杠
    wb.save(excelpath)

    reset_col(file_path)
    return True

def save_new_sheet(excel_file, sheets_dic):
    """打开excel新增sheet
    :param excel_file:
    :param sheetname:
    :param data:
    :return:
    """
    # logger.info(f'save_new_sheet: sheets_dic:{sheets_dic}')
    # 打开文件
    workbook = load_workbook(excel_file, data_only=True)
    mark_flag_lists = sheets_dic.pop('mark_flag_lists')
    # 遍历数据,生成多个sheet
    for sheetname, data in sheets_dic.items():
        # 删除同名的工作表sheet
        if sheetname in workbook.sheetnames:
            workbook.remove(workbook[sheetname])
        # 创建新的sheet
        ws = workbook.create_sheet(title=sheetname)
        # # 生成表头
        # ws.append(data[0])
        # 生成各行数据
        for row_index,row in enumerate(data):
            ws.append(row)

            # 对于Miss_math这个sheet,错误的地方需要红色标记出来
            if sheetname=='Miss_math' and row_index!=0:
                for column_index in range(len(row)):
                    if mark_flag_lists[row_index-1][column_index] == 1:
                        cell_a = ws.cell(row_index+1, column_index+1) # excel行数 1-n
                        cell_a.fill = PatternFill("solid", fgColor="FF6633")
                        cell_a.font = Font(color=colors.BLACK, bold=True)

    excelpath = Path(excel_file)
    excelpath.resolve()  # 绝对化路径,并将路径转换为合适的格式,例如windows下将斜杠转化为反斜杠
    workbook.save(excelpath)
    reset_col(excel_file)

def load_excel(excel_file: str, primary_key_lis: list, sheet_names: Iterable = None) -> List[Dict[str, Iterable] or List[str, Iterable]]:
    """加载excel文件中指定sheet中的数据
    每个sheet里第一行必须是表格的header。
    Args:
        excel_file: excel文件的路径
        primary_key_lis: 二元列表,列表值为列的索引0-n,每一行这两个列的数据会拼接成一个primary_key用来标记每一行的数据
        sheet_names: 要读取的sheet名称。默认是None,表示全部读取

    Returns:
        返回一个[dict, list]
        dict: key是sheet的名字,value1也是一个dict,value2是namedtuple的Iterable,。如{'Sheet1': {'key':(2.1, 2.2), 'key2':(3.1, 3.2), 'key3':(4.1, 4.2)},}
        list: 列表项是Iterable,primary_key有重复的行只记录一次,其他的 sheet_name+每行数据 放进这里面
        rows_num_list: 两个sheet的数据行数,如[100, 99]
    """
    logger.info(f'load_excel: primary_key_lis:{primary_key_lis}, sheet_names:{sheet_names}')
    workbook = load_workbook(excel_file, data_only=True)
    if not sheet_names:
        sheet_names = workbook.sheetnames
    result = {}
    duplicate_rows_list = []
    rows_num_list = []
    for ws_name in sheet_names:
        logger.info(f'load_excel: start load {ws_name}')
        if ws_name not in workbook.sheetnames:
            logger.exception(f'表格"{ws_name}"不存在。')
            raise error.Error(f'表格"{ws_name}"不存在。')
        # 选取一个sheet表单
        sheet = workbook[ws_name]
        # 获取第一行表头
        header = [c.value for c in sheet[1]]
        # result[ws_name] = [
        #     RowTuple(header, values)
        #     for values in sheet.iter_rows(min_row=2, values_only=True)
        # ]

        rows_dic = {}
        rows = 0
        for values in sheet.iter_rows(min_row=2, values_only=True):
            rows += 1

            # 某两列的数据拼接成唯一primary_key作为对照的id
            rows_dic_key = f'{str(values[primary_key_lis[0]])}_{str(values[primary_key_lis[1]])}'
            # print(rows_dic_key)

            # 出现相同数据则数据有异常 或 primary_key选取不当
            if rows_dic_key in rows_dic.keys():
                logger.warning(f'find duplicate_rows :{rows_dic_key} value:{values}')
                lis = list(values)
                lis.insert(0, ws_name)
                duplicate_rows_list.append(lis)
                continue

            rows_dic[rows_dic_key] = RowTuple(header, values)

        rows_num_list.append(rows)

        # 额外添加表头字段进去,便于生成表头
        rows_dic['headers_lis'] = header
        result[ws_name] = rows_dic

    return [result, duplicate_rows_list, rows_num_list]

def compare_excels(excel_file: str, primary_key_lis: list, sheet1: str, sheet2: str, ignore_columns_list: Sequence=() ):
    """根据两个列组合primary_key,对两个excel表中两个sheet数据一行行的对比, 找出不同
    :param excel_file: 文件路径
    :param primary_key_lis: 组合primary_key的两个列的索引,如[0, 1]会将第一列和第二列拼接为primary_key
    :param sheet1 & sheet2: 要对比的两个工作表sheet名子
    :param ignore_columns_list: 要忽略的列的索引,0-n
    :return: 会在该excel中生成sheet1_only、sheet2_only、mismatch、duplicate 4个sheet
    """
    # print(f'compare_excels: primary_key_lis:{primary_key_lis}, sheet1:{sheet1}, sheet2:{sheet2}, ignore_columns_list:{ignore_columns_list}')
    logger.info(f'compare_excels: primary_key_lis:{primary_key_lis}, sheet1:{sheet1}, sheet2:{sheet2}, ignore_columns_list:{ignore_columns_list}')
    res_list = load_excel(excel_file= excel_file, primary_key_lis=primary_key_lis, sheet_names=[sheet1, sheet2])
    sheet1_dic = res_list[0][sheet1]
    sheet2_dic = res_list[0][sheet2]
    duplicate_rows_list = res_list[1]
    sheet1_num, sheet2_num = res_list[2]


    sheet1_only_list = [sheet1_dic['headers_lis'],]
    sheet2_only_list = [sheet2_dic['headers_lis'],]

    lis = ['source', ]
    lis.extend(sheet1_dic['headers_lis'])
    miss_math_list = [lis,]

    # 找出两个表不共同拥有的行, 一定是对不上的,分别存放
    sheet1_or_sheet2_ids = set(sheet1_dic.keys()) ^ set(sheet2_dic.keys())
    for id in sheet1_or_sheet2_ids:
        if id in set(sheet1_dic.keys()):
            sheet1_only_list.append(sheet1_dic.get(id))
        else:
            sheet2_only_list.append(sheet2_dic.get(id))

    # 找出共有的行,但数据不相同的
    sheet1_and_sheet2_ids = set(sheet1_dic.keys()) & set(sheet2_dic.keys())
    match_num = 0
    miss_math_num = 0
    mark_flag_lists = [] # 不匹配的数据标记出来
    # 第一步:遍历行
    for id in sheet1_and_sheet2_ids:
        row_sheet1 = sheet1_dic[id]
        row_sheet2 = sheet2_dic[id]
        mark_flag_list = []
        # 第二步:遍历每一行所有列
        for i in range(len(row_sheet1)):
            if i in ignore_columns_list:
                # logger.warning(f'这是第{i}列数据,可以忽略')
                mark_flag_list.append(0)
                continue
            if row_sheet1[i] != row_sheet2[i]:
                mark_flag_list.append(1)
                miss_math_num += 1
            else:
                mark_flag_list.append(0)

        # 只要一个标志位是1,则mismath
        if 1 in mark_flag_list:
            lis1 = [sheet1, ]
            lis1.extend(list(row_sheet1))
            miss_math_list.append(lis1)

            lis2 = [sheet2, ]
            lis2.extend(list(row_sheet2))
            miss_math_list.append(lis2)

            # 第一列source不需要标记
            mark_flag_list.insert(0, 0)
            # 两个表的不同,需要添加两行
            mark_flag_lists.append(mark_flag_list)
            mark_flag_lists.append(mark_flag_list)
        else:
            match_num += 1

    sheets_dict= {
        f'{sheet1}_only': sheet1_only_list,
        f'{sheet2}_only': sheet2_only_list,
        f'Miss_math': miss_math_list,
        f'Duplicate': duplicate_rows_list,
        'mark_flag_lists': mark_flag_lists, # [[0,0,1], [1,0,0], ...]
                 }

    save_new_sheet(excel_file, sheets_dic=sheets_dict)

    match_num -= 1 # 表头
    duplicate_rows = len(duplicate_rows_list)
    sheet1_only = len(sheet1_only_list) - 1
    sheet2_only = len(sheet2_only_list) - 1
    logger.info(f'{sheet1}:{sheet1_num}, {sheet2}:{sheet2_num}, match_num:{match_num}, miss_math_num:{miss_math_num},duplicate_rows:{duplicate_rows}, {sheet1}_only:{sheet1_only}, {sheet2}_only:{sheet2_only}')

 

标签:rows,sheet,list,sheet1,key,sheet2,111
来源: https://www.cnblogs.com/xp1315458571/p/16310151.html