项目稍微有点复杂,使用Python将录音文件归集起来,需要的信息有分别是,execl文件,mysql文件,录音文件,将其组装起来,增加了进度条,方便使用的同事能看到进度,代码中的内容已经脱敏,熟悉Python的小伙伴一看就知道是怎么个事?
# coding=utf-8
"""
@Time : 2023/10/26 14:21
@Author : Taering
@File :main.py
@IDE :PyCharm
@DESC : 从execl文件及数据库文件匹配录音生成相应的索引
"""
import logging
import os
import pandas as pd
from tqdm import tqdm
from datetime import datetime, timedelta
import time
base_path = os.path.abspath(os.path.dirname(__file__))
execl_path = os.path.join(base_path, 'execl_files')
done_path = os.path.join(base_path, 'done')
record_path = os.path.join(base_path, 'records')
csv_path = os.path.join(base_path, 'sql_file', 'data.csv')
print(f'execl文件的目录为:{execl_path}')
print(f'done文件的目录为:{done_path}')
print(f'record文件的目录为:{record_path}')
logging.basicConfig(filename='logs/run.log', level=logging.INFO, format='%(asctime)s - %(message)s')
print(f'开始将数据库文件导入内存')
start_sqlfile_time = time.time()
cdr_data = pd.read_csv(csv_path)
print(f'sql文件导入内存完成,耗时{time.time() - start_sqlfile_time}秒')
def main():
# 读取Excel文件
print('开始读取Execl文件')
start_execl_time = time.time()
logging.info('开始读取Execl文件')
# 初始化两个空的DataFrames
df_a_list = []
df_b_list = []
file_list = os.listdir(execl_path)
for file in file_list:
if file.endswith('.xlsx'):
df_temp = pd.read_excel(os.path.join(execl_path, file))
if 'A的特征' in df_temp.columns:
df_a_list.append(df_temp)
elif 'B的特征' in df_temp.columns:
df_b_list.append(df_temp)
# 使用pd.concat来合并数据
df_a = pd.concat(df_a_list, ignore_index=True)
df_b = pd.concat(df_b_list, ignore_index=True)
num_records_a = df_a.shape[0]
num_records_b = df_b.shape[0]
print(
f"读取execl完成,df_a中有{num_records_a}条记录,df_b中有{num_records_b}条记录,共耗时:{time.time() - start_execl_time}秒")
count = 0
# 创建字典来存储文件路径和UUID的映射
print(f'开始映射文件')
start_file_time = time.time()
file_dict = {}
for root, dirs, files in os.walk(record_path):
for file in files:
uuid = file.split('_')[0]
file_dict[uuid] = os.path.join(root, file)
print(f'已映射 {len(file_dict)} 条录音,消耗时间为:{time.time() - start_file_time}秒')
for _, row in tqdm(df_a.iterrows(), total=df_a.shape[0], desc="正在匹配录音,请稍等",
bar_format="{l_bar}{bar}| {percentage:3.2f}%"):
phone_num = str(row["需要匹配的内容"]).strip()
if phone_num.isnumeric():
# 将字符串转换为datetime对象
collection_time = datetime.strptime(row['时间'], '%Y-%m-%d %H:%M:%S')
# 计算开始和结束的时间范围
start_time = collection_time - timedelta(minutes=30)
end_time = collection_time
mask = ((cdr_data['caller_id_number'] == phone_num) | (cdr_data['destination_number'] == phone_num)) & \
(cdr_data['start_stamp'] >= start_time.strftime('%Y-%m-%d %H:%M:%S')) & \
(cdr_data['start_stamp'] <= end_time.strftime('%Y-%m-%d %H:%M:%S'))
results = cdr_data[mask].values
for result in results:
uuid = result[1]
if uuid in file_dict:
file_path = file_dict[uuid]
logging.info('已匹配到录音文件,正在处理...')
print('已匹配到录音文件,正在处理...')
file_name = os.path.basename(file_path)
year = file_name.split('_')[-1][:4]
day_month_year = file_name.split('_')[-1][:8]
dest_folder = os.path.join(done_path, year, day_month_year)
if not os.path.exists(dest_folder):
os.makedirs(dest_folder)
os.makedirs(dest_folder, exist_ok=True)
new_file_name = f"{file_name.split('_')[-2]}_{file_name.split('_')[-1]}.mp3"
os.rename(file_path, os.path.join(dest_folder, new_file_name))
print(f'复制文件完成:{new_file_name}')
logging.info(f'复制文件完成:{new_file_name}')
# 创建或更新TXT文件
txt_file = os.path.join(dest_folder, f"DATA_{day_month_year}.txt")
if not os.path.exists(txt_file):
with open(txt_file, 'w') as txt:
txt.write(
"示例")
with open(txt_file, 'a') as txt:
txt.write(
f"{示例")
print('写入索引文件完成')
logging.info('写入索引文件完成')
count += 1
logging.info(f'已完成{count}条录音匹配')
print(f'已完成{count}条录音匹配')
break
logging.info(f"完成匹配录音,共匹配到 {count} 条录音。\n")
print(f"完成匹配录音,共匹配到 {count} 条录音。\n")
if __name__ == "__main__":
main()
项目的execl文件有两种,其中A是查询条件的主要execl文件,B是A的补充,可以从B中拿到A没有信息,然后匹配数据库文件,如果能匹配上再去找相应的文件,找到后按照指定的格式放到项目的done目录里。
项目涉及5个目录,分别是存放成果的done,存放录音文件的records,存放execl文件的execl_files,存放sql文件的sql_file,存放日志的log
录音和execl直接存放即可,程序会去判定哪些是A,哪些是B,录音的话程序会将其文件名及路径保存到内存中,方便后续的查找,mysql导出的文件必须是csv格式,且文件名写死是data,你要改的话自己改下代码