我孤身走在路上, 石子在雾中发亮,夜很安静,荒原面对太空,星星互诉衷肠
使用Python归集录音
使用Python归集录音

使用Python归集录音

项目稍微有点复杂,使用Python将录音文件归集起来,需要的信息有分别是,execl文件,mysql文件,录音文件,将其组装起来,增加了进度条,方便使用的同事能看到进度,代码中的内容已经脱敏,熟悉Python的小伙伴一看就知道是怎么个事?

# coding=utf-8
"""
@Time : 2023/10/26 14:21
@Author : Taering
@File :main.py
@IDE :PyCharm
@DESC : 从execl文件及数据库文件匹配录音生成相应的索引
"""

import logging
import os
import pandas as pd
from tqdm import tqdm
from datetime import datetime, timedelta
import time

base_path = os.path.abspath(os.path.dirname(__file__))
execl_path = os.path.join(base_path, 'execl_files')
done_path = os.path.join(base_path, 'done')
record_path = os.path.join(base_path, 'records')
csv_path = os.path.join(base_path, 'sql_file', 'data.csv')
print(f'execl文件的目录为:{execl_path}')
print(f'done文件的目录为:{done_path}')
print(f'record文件的目录为:{record_path}')
logging.basicConfig(filename='logs/run.log', level=logging.INFO, format='%(asctime)s - %(message)s')

print(f'开始将数据库文件导入内存')
start_sqlfile_time = time.time()
cdr_data = pd.read_csv(csv_path)
print(f'sql文件导入内存完成,耗时{time.time() - start_sqlfile_time}秒')

def main():
    # 读取Excel文件
    print('开始读取Execl文件')
    start_execl_time = time.time()
    logging.info('开始读取Execl文件')
    # 初始化两个空的DataFrames
    df_a_list = []
    df_b_list = []
    file_list = os.listdir(execl_path)
    for file in file_list:
        if file.endswith('.xlsx'):
            df_temp = pd.read_excel(os.path.join(execl_path, file))
            if 'A的特征' in df_temp.columns:
                df_a_list.append(df_temp)
            elif 'B的特征' in df_temp.columns:
                df_b_list.append(df_temp)
    # 使用pd.concat来合并数据
    df_a = pd.concat(df_a_list, ignore_index=True)
    df_b = pd.concat(df_b_list, ignore_index=True)
    num_records_a = df_a.shape[0]
    num_records_b = df_b.shape[0]
    print(
        f"读取execl完成,df_a中有{num_records_a}条记录,df_b中有{num_records_b}条记录,共耗时:{time.time() - start_execl_time}秒")
    count = 0
    # 创建字典来存储文件路径和UUID的映射
    print(f'开始映射文件')
    start_file_time = time.time()
    file_dict = {}
    for root, dirs, files in os.walk(record_path):
        for file in files:
            uuid = file.split('_')[0]
            file_dict[uuid] = os.path.join(root, file)
    print(f'已映射 {len(file_dict)} 条录音,消耗时间为:{time.time() - start_file_time}秒')
    for _, row in tqdm(df_a.iterrows(), total=df_a.shape[0], desc="正在匹配录音,请稍等",
                       bar_format="{l_bar}{bar}| {percentage:3.2f}%"):
        phone_num = str(row["需要匹配的内容"]).strip()
        if phone_num.isnumeric():
            # 将字符串转换为datetime对象
            collection_time = datetime.strptime(row['时间'], '%Y-%m-%d %H:%M:%S')
            # 计算开始和结束的时间范围
            start_time = collection_time - timedelta(minutes=30)
            end_time = collection_time
            mask = ((cdr_data['caller_id_number'] == phone_num) | (cdr_data['destination_number'] == phone_num)) & \
                   (cdr_data['start_stamp'] >= start_time.strftime('%Y-%m-%d %H:%M:%S')) & \
                   (cdr_data['start_stamp'] <= end_time.strftime('%Y-%m-%d %H:%M:%S'))
            results = cdr_data[mask].values
            for result in results:
                uuid = result[1]
                if uuid in file_dict:
                    file_path = file_dict[uuid]
                    logging.info('已匹配到录音文件,正在处理...')
                    print('已匹配到录音文件,正在处理...')
                    file_name = os.path.basename(file_path)
                    year = file_name.split('_')[-1][:4]
                    day_month_year = file_name.split('_')[-1][:8]
                    dest_folder = os.path.join(done_path, year, day_month_year)
                    if not os.path.exists(dest_folder):
                        os.makedirs(dest_folder)
                    os.makedirs(dest_folder, exist_ok=True)
                    new_file_name = f"{file_name.split('_')[-2]}_{file_name.split('_')[-1]}.mp3"
                    os.rename(file_path, os.path.join(dest_folder, new_file_name))
                    print(f'复制文件完成:{new_file_name}')
                    logging.info(f'复制文件完成:{new_file_name}')
                    # 创建或更新TXT文件
                    txt_file = os.path.join(dest_folder, f"DATA_{day_month_year}.txt")
                    if not os.path.exists(txt_file):
                        with open(txt_file, 'w') as txt:
                            txt.write(
                                "示例")
                    with open(txt_file, 'a') as txt:
                        txt.write(
                            f"{示例")
                        print('写入索引文件完成')
                        logging.info('写入索引文件完成')
                        count += 1
                        logging.info(f'已完成{count}条录音匹配')
                        print(f'已完成{count}条录音匹配')
                        break
    logging.info(f"完成匹配录音,共匹配到 {count} 条录音。\n")
    print(f"完成匹配录音,共匹配到 {count} 条录音。\n")


if __name__ == "__main__":
    main()

项目的execl文件有两种,其中A是查询条件的主要execl文件,B是A的补充,可以从B中拿到A没有信息,然后匹配数据库文件,如果能匹配上再去找相应的文件,找到后按照指定的格式放到项目的done目录里。

项目涉及5个目录,分别是存放成果的done,存放录音文件的records,存放execl文件的execl_files,存放sql文件的sql_file,存放日志的log

录音和execl直接存放即可,程序会去判定哪些是A,哪些是B,录音的话程序会将其文件名及路径保存到内存中,方便后续的查找,mysql导出的文件必须是csv格式,且文件名写死是data,你要改的话自己改下代码

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

+ 18 = 23