python 解决动态的定义变量名,并给其赋值的方法(大数据处理)

作者:威震四海 时间:2021-10-09 10:44:58 

最近消费kafka数据到磁盘的时候遇到了这样的问题:

需求:每天大概有1千万条数据,每条数据包含19个字段信息,需要将数据写到服务器磁盘,以第二个字段作为大类建立目录,第7个字段作为小类配合时间戳作为文件名,临时文件后缀tmp,当每个文件的写入条数(可配置,比如100条)达到要求条数时,将后缀tmp改为out。

问题:大类共有30个,小类不计其数而且未知,比如大类为A,小类为a,时间戳为20180606095835234,则A目录下的文件名为20180606095835234_a.tmp,这样一来需要在此文件写满100条时,更新时间戳生成第二个文件名,如果此时有1000个文件都在写则需要有1000个时间戳,和1000个计数器记录每个文件当前的条数,如果分别定义1000个变量显然是不划算的,

尝试:中间过程想到了动态定义变量名,即

定义第七个字段:seven = data.split('|')[7]

定义文件名:filename = time_stamp + '_' + seven+'.tmp',

定义文件计数器:seven + ‘_num' = 0

定义文件时间戳:seven + '_stamp' = time.time( )

想法其实是没问题的,但是这里用到了一个不常用的语法:用一个变量名和一个字符串拼接出来一个新的变量名,并继续赋值(不知道我的表述是否清楚),试过了用local()函数、global()函数、exec()函数都没有达到预期效果,也许是把问题想的太复杂了

解决:最后使用三个字典将这个问题完美解决,

定义一个字典用来存计数器,字典的每一个键对应一个文件名,值对应当前计数,并实时更新;

定义一个字典用来存时间戳,键对应一个文件名,值对应时间戳,达到100条就更新一次;

定义一个字典用来存大类,键对应代号,值对应分类;

局部功能代码如下:


def kafka_to_disk():
print('启动前检测上次运行时是否存在意外中断的数据文件......')
print('搜索最近一次执行脚本产生的时间目录......')
# 待处理临时文件列表
tmp_list = []
try:
 for category_dir in os.listdir(local_file_path):
  if len(os.listdir(local_file_path+os.sep+category_dir)) > 0:
   for file in os.listdir(local_file_path+os.sep+category_dir):
    if suffix in file:
     tmp_list.append(local_file_path+os.sep+category_dir+os.sep+file)
 # print('上次运行程序产生的临时文件有---{}'.format(tmp_list))
except Exception as e:
 pass
if len(tmp_list) == 0:
 print('未扫描任何残留临时文件')
else:
 print('开始修复残留临时文件......')
tmp_num = 0
for tmp in tmp_list:
 os.rename(tmp, tmp.split('.')[0]+'.out')
 tmp_num += 1
print('本次启动共修复残留临时文件★★★★★-----{}个-----★★★★★'.format(tmp_num))

category_poor = {
 '1': 'news', '2': 'weibo', '3': 'weixin', '4': 'app', '5': 'newspaper', '6': 'luntan',
 '7': 'blog', '8': 'video', '9': 'shangji', '10': 'shangjia', '11': 'gtzy', '12': 'zfztb',
 '13': 'gyfp', '14': 'gjz', '15': 'zfxx', '16': 'ptztb', '17': 'company', '18': 'house',
 '19': 'hospital', '20': 'bank', '21': 'zone', '22': 'express', '23': 'zpgw', '24': 'zscq',
 '25': 'hotel', '26': 'cpws', '27': 'gxqy', '28': 'gpjj', '29': 'dtyy', '30': 'bdbk'}

time_stamp = utils.get_time_stamp() # 初始化毫秒级时间戳 : 20180509103015125
consumer = KafkaConsumer(topic, group_id=group_id, auto_offset_reset=auto_offset_reset, bootstrap_servers=eval(bootstrap_servers))
print('连接kafka成功,数据筛选中......')
file_poor = {}       # 子类池用于文件计数器
time_stamp_poor = {}     # 子类时间戳池,用于触发文件切换
time_stamp = utils.get_time_stamp()  # 初始化毫秒级时间戳 :20180509103015125
for message in consumer:
 # 提取第8个字段自动匹配目录进行创建
 if message.value.decode().split('|')[1] in category_poor:
  category = category_poor[message.value.decode().split('|')[1]]
 else:
  print(message.value.decode())
  continue
 category_dir = local_file_path + os.sep + category
 if not os.path.exists(category_dir):
  os.makedirs(category_dir)
 # 提取第2个字段,用于生成文件名
 if message.value.decode().split('|')[7] in time_stamp_poor:
  shot_file_name = time_stamp_poor[message.value.decode().split('|')[7]] + '_' + message.value.decode().split('|')[7]
 else:
  shot_file_name = time_stamp + '_' + message.value.decode().split('|')[7]
 file_name = category_dir + os.sep + shot_file_name + '.tmp'

# 给每一个文件设定一个计数器
 if message.value.decode().split('|')[7] not in file_poor:
  file_poor[message.value.decode().split('|')[7]] = 0

with open(file_name, 'a', encoding='utf-8')as f1:
  f1.write(message.value.decode())
  file_poor[message.value.decode().split('|')[7]] += 1

# 触发切换文件的操作,用时间戳生成第二文件名
 if file_poor[message.value.decode().split('|')[7]] == strip_number:
  time_stamp_poor[message.value.decode().split('|')[7]] = utils.get_time_stamp()
  file_poor[message.value.decode().split('|')[7]] = 0

来源:https://blog.csdn.net/Beyond_F4/article/details/80590082

标签:python,变量名,赋值
0
投稿

猜你喜欢

  • python重试装饰器的简单实现方法

    2022-07-15 12:54:10
  • Microsoft Office Access 2007使用技巧

    2008-05-23 13:23:00
  • 利用SQLyogEnt对Mysql数据库进行转移

    2012-02-25 20:17:30
  • 将函数的实际参数转换成数组的方法

    2024-04-18 09:59:56
  • Python字符串的字符转换、字符串劈分、字符串合并问题分析

    2021-07-22 19:14:24
  • 详解OpenCV图像的概念和基本操作

    2021-07-22 02:05:30
  • python回溯法实现数组全排列输出实例分析

    2023-06-18 20:42:27
  • python简单实现基数排序算法

    2023-11-10 06:27:27
  • Win7 x64 IIS运行ASP+Access故障完美解决方法(转)

    2012-03-27 18:30:35
  • Django 项目布局方法(值得推荐)

    2022-08-22 12:44:22
  • python中windows链接linux执行命令并获取执行状态的问题小结

    2022-05-18 12:56:55
  • 使用tensorflow进行音乐类型的分类

    2021-02-22 16:58:31
  • Django通过json格式收集主机信息

    2022-03-23 19:58:49
  • 瞬间的设计(四)【碳酸饮料会】

    2009-12-23 13:56:00
  • ASP.NET MVC实现区域或城市选择

    2023-07-13 17:50:00
  • python中elasticsearch_dsl模块的使用方法

    2022-03-23 07:57:31
  • XML文件的显示——CSS和XSL

    2007-10-15 18:48:00
  • 详解vue-cli 本地开发mock数据使用方法

    2024-05-10 14:20:21
  • 分享十个Python提高工作效率的自动化脚本

    2023-12-20 21:34:05
  • mysql导入导出数据中文乱码解决方法小结

    2024-01-19 17:47:45
  • asp之家 网络编程 m.aspxhome.com