03
2025
04
15:22:38

Excel数据导入数据库MySQL工具

excel数据导入mysql工具(mysql批量入库),可以批量导入文件夹下所有.xlsx表格数据。

使用方法

1、配置.ini文件

修改目录下“配置.ini”文件的数据库连接参数、包括:数据库名称、表名称、excel列名称和对应的数据库字段名称。

Excel数据导入数据库MySQL工具  第1张

2、点击“测试连接”

3、点击“导入数据”

选择.xlsx 文件所在的文件夹。软件会自动遍历文件夹下所有.xlsx文件,并导入数据。

Excel数据导入数据库MySQL工具  第2张

源码内容:


import tkinter as tkfrom tkinter import filedialog, messagebox, ttkimport threadingimport pandas as pdimport mysql.connectorimport osimport loggingfrom logging.handlers import RotatingFileHandlerimport configparser# 设置日志记录logger = logging.getLogger(__name__)

logger.setLevel(logging.DEBUG)

handler = RotatingFileHandler('app.log', maxBytes=1000000, backupCount=5)

formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')

handler.setFormatter(formatter)

logger.addHandler(handler)class ExcelToMySQLApp:

    def __init__(self, master):

        self.master = master

        master.title("Excel数据导入MySQL工具-老吴搭建教程www.lw50.com")


        # 加载配置文件        config = configparser.ConfigParser()

        config.read('配置.ini', encoding='utf-8')


        # 数据库连接信息        self.db_config = {

            'host': config.get('Database', 'host'),

            'port': config.getint('Database', 'port'),

            'user': config.get('Database', 'user'),

            'password': config.get('Database', 'password'),

            'database': config.get('Database', 'name'),  # 使用'database'作为关键字参数        }


        # 表名称        self.table_name = config.get('Table', 'name')


        # Excel 列名        self.excel_columns = config.get('ExcelColumns', 'columns').split(',')


        # 数据库字段名        self.database_columns = config.get('DatabaseColumns', 'columns').split(',')


        # 初始化界面组件        self.create_ui()


    def create_ui(self):

        # 主界面布局        self.db_frame = ttk.LabelFrame(self.master, text="数据库连接设置")

        self.db_frame.grid(row=0, column=0, sticky=(tk.W + tk.E), padx=5, pady=5)


        # 测试连接按钮和连接状态标签放在同一行        self.test_connection_button = ttk.Button(self.db_frame, text="测试连接", command=self.test_connection)

        self.test_connection_button.grid(row=0, column=0, sticky=tk.W, pady=5)


        self.db_connection_status_label = ttk.Label(self.db_frame, text="数据库未连接")

        self.db_connection_status_label.grid(row=0, column=1, sticky=tk.W)


        self.db_name_label = ttk.Label(self.db_frame, text="数据库名称:")

        self.db_name_label.grid(row=1, column=0, sticky=tk.W)


        self.db_name_value_label = ttk.Label(self.db_frame, text=self.db_config['database'])

        self.db_name_value_label.grid(row=1, column=1, sticky=tk.W)


        self.table_name_label = ttk.Label(self.db_frame, text="表名称:")

        self.table_name_label.grid(row=2, column=0, sticky=tk.W)


        self.table_name_value_label = ttk.Label(self.db_frame, text=self.table_name)

        self.table_name_value_label.grid(row=2, column=1, sticky=tk.W)


        # 数据来源选择区        self.source_frame = ttk.LabelFrame(self.master, text="数据来源")

        self.source_frame.grid(row=1, column=0, sticky=(tk.W + tk.E), padx=5, pady=5)


        self.select_folder_button = ttk.Button(self.source_frame, text="选择文件夹", command=self.select_folder)

        self.select_folder_button.grid(row=0, column=0, sticky=tk.W, pady=5)


        self.selected_folder_label = ttk.Label(self.source_frame, text="未选择文件夹")

        self.selected_folder_label.grid(row=1, column=0, sticky=tk.W)


        # 映射关系展示区        self.mapping_frame = ttk.LabelFrame(self.master, text="列名映射")

        self.mapping_frame.grid(row=2, column=0, sticky=(tk.W + tk.E), padx=5, pady=5)

        self.mapping_tree = ttk.Treeview(self.mapping_frame, columns=("Excel Column", "DB Column"), show="headings")

        self.mapping_tree.heading("Excel Column", text="Excel Column")

        self.mapping_tree.heading("DB Column", text="DB Column")

        for excel_col, db_col in zip(self.excel_columns, self.database_columns):

            self.mapping_tree.insert("", "end", values=(excel_col, db_col))

        self.mapping_tree.pack(fill=tk.BOTH, expand=True)


        # 导入数据按钮        self.import_button = ttk.Button(self.master, text="导入数据", command=self.start_import_process,

                                        state=tk.DISABLED)

        self.import_button.grid(row=3, column=0, sticky=tk.W, pady=5)


        # 可导入状态标签        self.ready_to_import_label = ttk.Label(self.master, text="未准备好导入")

        self.ready_to_import_label.grid(row=4, column=0, sticky=tk.W)


        # 文件进度条        self.file_progressbar = ttk.Progressbar(self.master, orient="horizontal", length=200, mode="determinate")

        self.file_progressbar.grid(row=5, column=0, sticky=tk.W)


        # 文件进度状态标签        self.file_progress_status_label = ttk.Label(self.master, text="文件进度: ")

        self.file_progress_status_label.grid(row=6, column=0, sticky=tk.W)


        # 总进度条        self.total_progressbar = ttk.Progressbar(self.master, orient="horizontal", length=200, mode="determinate")

        self.total_progressbar.grid(row=7, column=0, sticky=tk.W)


        # 总进度状态标签        self.overall_progress_status_label = ttk.Label(self.master, text="总进度: ")

        self.overall_progress_status_label.grid(row=8, column=0, sticky=tk.W)


    def test_connection(self):

        # 测试数据库连接        try:

            connection = mysql.connector.connect(**self.db_config)

            connection.close()

            self.db_connection_status_label.config(text="数据库已连接")

            self.test_connection_button.config(state=tk.DISABLED)

            logger.info("数据库连接成功")

            messagebox.showinfo("成功", "数据库连接成功!")

        except Exception as e:

            self.db_connection_status_label.config(text="数据库连接失败")

            logger.error(f"数据库连接失败: {e}")

            messagebox.showerror("错误", f"数据库连接失败: {e}")


    def select_folder(self):

        # 选择文件夹        directory = filedialog.askdirectory()

        if directory:

            self.selected_folder_label.config(text=directory)

            self.check_excel_columns(directory)


    def check_excel_columns(self, directory):

        # 检查文件夹中的 Excel 列名        first_excel_file = next((f for f in os.listdir(directory) if f.endswith('.xlsx')), None)

        if first_excel_file:

            filepath = os.path.join(directory, first_excel_file)

            try:

                df = pd.read_excel(filepath, usecols=self.excel_columns, nrows=1)

                logger.debug(f"检查文件 {first_excel_file} 的列名:{df.columns}")

                if set(self.excel_columns).issubset(set(df.columns)):

                    logger.info("检查完成,Excel 列名和配置文件中的列名匹配!")

                    messagebox.showinfo("提示", "Excel 列名与配置文件中的列名匹配!")

                    self.enable_import_button()

                else:

                    logger.warning(

                        f"Excel 列名与配置文件中的列名不匹配!缺少列:{set(self.excel_columns) - set(df.columns)}")

                    messagebox.showwarning("警告",

                                           f"Excel 列名与配置文件中的列名不匹配!缺少列:{set(self.excel_columns) - set(df.columns)}")

            except Exception as e:

                logger.error(f"无法读取文件 {first_excel_file}: {e}")

                messagebox.showerror("错误", f"无法读取文件 {first_excel_file}: {e}")

                return    def enable_import_button(self):

        # 启用导入数据按钮        self.import_button.config(state=tk.NORMAL)

        self.ready_to_import_label.config(text="准备好导入")


    def start_import_process(self):

        # 开始导入数据过程        self.import_button.config(state=tk.DISABLED)

        self.ready_to_import_label.config(text="正在导入...")

        selected_folder = self.selected_folder_label.cget("text")

        if not selected_folder or selected_folder == "未选择文件夹":

            messagebox.showwarning("警告", "请先选择文件夹!")

            return        self.files_to_process = [f for f in os.listdir(selected_folder) if f.endswith('.xlsx')]

        self.total_rows = sum(len(pd.read_excel(os.path.join(selected_folder, f), usecols=self.excel_columns)) for f in                              self.files_to_process)

        self.inserted_rows = 0        logger.info("连接数据库...")

        self.db = mysql.connector.connect(**self.db_config)

        logger.info("数据库已连接")

        self.insert_sql = f"INSERT INTO {self.table_name} ({', '.join(self.database_columns)}) VALUES ({', '.join(['%s'] * len(self.database_columns))});"        # 检查数据库表结构        cursor = self.db.cursor()

        try:

            self.check_table_structure(cursor)

        finally:

            cursor.close()


        threading.Thread(target=self.import_data_thread, args=(selected_folder,)).start()


    def import_data_thread(self, selected_folder):

        # 导入数据线程        cursor = self.db.cursor()

        try:

            for file_index, filename in enumerate(self.files_to_process):

                filepath = os.path.join(selected_folder, filename)

                df = pd.read_excel(filepath, usecols=self.excel_columns)

                # 数据清洗                mapped_df = self.clean_data(df)

                mapped_values = mapped_df.values.tolist()

                self.current_file_rows = len(mapped_values)

                self.current_inserted_rows = 0                batch_size = 1000  # 批量插入的大小                total_batches = len(mapped_values) // batch_size + (1 if len(mapped_values) % batch_size > 0 else 0)


                for batch_index in range(total_batches):

                    start = batch_index * batch_size

                    end = min((batch_index + 1) * batch_size, len(mapped_values))

                    current_batch = mapped_values[start:end]


                    # 开始事务                    cursor.execute("START TRANSACTION")


                    # 尝试插入数据                    try:

                        cursor.executemany(self.insert_sql, current_batch)

                        affected_rows = cursor.rowcount

                        # 提交事务                        cursor.execute("COMMIT")

                        logger.info(

                            f"Batch {batch_index + 1}/{total_batches} of file {filename} successfully inserted. Affected rows: {affected_rows}")

                        self.current_inserted_rows += affected_rows

                    except Exception as e:

                        # 记录错误信息,并回滚事务                        logger.error(

                            f"From {filename}: Error inserting batch {batch_index + 1}/{total_batches}: {str(e)}")

                        cursor.execute("ROLLBACK")

                        self.retry_insert(cursor, self.insert_sql, current_batch, filename)


                    # 更新文件进度状态                    self.file_progress_status_label.config(

                        text=f"正在处理 {filename},已处理行数: {self.current_inserted_rows}/{self.current_file_rows}")

                    self.file_progressbar['value'] = (self.current_inserted_rows * 100) / self.current_file_rows


                    # 更新总进度状态                    self.inserted_rows += affected_rows

                    self.overall_progress_status_label.config(

                        text=f"总进度: {self.inserted_rows}/{self.total_rows}")

                    self.total_progressbar['value'] = (self.inserted_rows * 100) / self.total_rows


                    # 模拟主循环更新UI                    self.master.update_idletasks()


            if self.inserted_rows == self.total_rows:

                messagebox.showinfo("提示", "导入完成!")

        finally:

            cursor.close()  # 确保在任何情况下都关闭游标    def check_table_structure(self, cursor):

        cursor.execute(f"DESCRIBE {self.table_name}")

        table_structure = cursor.fetchall()

        logger.info(f"数据库表 {self.table_name} 字段结构:{table_structure}")


        # 检查数据库表字段名与 Excel 列名是否匹配        db_columns = [col_info[0] for col_info in table_structure]

        missing_db_columns = [col for col in self.database_columns if col not in db_columns]

        if missing_db_columns:

            logger.error(f"数据库表 {self.table_name} 缺少字段:{missing_db_columns}")

            raise ValueError(f"数据库表 {self.table_name} 缺少字段:{missing_db_columns}")


    def clean_data(self, df):

        # 数据清洗        # 确保所有数据在插入数据库之前已经进行了适当的清洗        df = df.where(pd.notna(df), None)

        return df


    def retry_insert(self, cursor, insert_sql, data, filename):

        max_retries = 3        retries = 0        while retries < max_retries:

            try:

                cursor.execute("START TRANSACTION")

                cursor.executemany(insert_sql, data)

                cursor.execute("COMMIT")

                logger.info(f"Retry successful after {retries + 1} attempts for file {filename}.")

                self.current_inserted_rows += len(data)

                break            except Exception as e:

                retries += 1                logger.error(f"Retry {retries}/{max_retries} failed for {filename}: {str(e)}")

                cursor.execute("ROLLBACK")

                if retries == max_retries:

                    # 在最大重试次数后记录错误行                    logger.error(f"Max retries reached for {filename}. Attempting to insert rows individually.")

                    for row_index, row in enumerate(data):

                        try:

                            cursor.execute("START TRANSACTION")

                            cursor.execute(insert_sql, row)

                            cursor.execute("COMMIT")

                            self.current_inserted_rows += 1                        except Exception as single_row_error:

                            logger.error(

                                f"Failed to insert row {row} at index {row_index} from {filename}: {single_row_error}")

                            cursor.execute("ROLLBACK")

                    breakif __name__ == "__main__":

    root = tk.Tk()

    app = ExcelToMySQLApp(root)

    root.mainloop()


附件地址:

附件为编译后exe文件:Excel数据导入数据库MySQL工具.exe

链接: https://pan.baidu.com/s/1SewUtbUNNxvTZknZDNwEQA?pwd=ks18 提取码: ks18 



推荐本站淘宝优惠价购买喜欢的宝贝:

image.png

本文链接:https://hqyman.cn/post/10125.html 非本站原创文章欢迎转载,原创文章需保留本站地址!

分享到:
打赏





休息一下~~


« 上一篇 下一篇 »

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

请先 登录 再评论,若不是会员请先 注册

您的IP地址是: