Python读取文件的操作方法,python中读取文件的几种方法的区别

  Python读取文件的操作方法,python中读取文件的几种方法的区别

  本文将从核酸采集的实际问题出发,详细讲解Python读取文件的四种方式。文中的样例代码讲解的很详细,感兴趣的朋友可以看看。

  00-1010学生人数特别少的情况下停车场空间不够怎么办?如何加快执行效率?如何加快处理速度?后记背景:最近在处理维基百科数据时,发现以前的文件读取和数据处理方法因为数据量大,几乎不可用或者耗时很长。今天学校安排了一次统一的核酸测试,恰好和文件阅读的过程很像。正好借此机会跟你从头梳理一下几种文件阅读方法。

  地点:现在学校要求所有学生收集核酸。每个学生在宿舍等待门卫(以下简称“大白”)的召唤。给自己打电话的时候,他去停车场排队等大白来接自己。采集结束后,样品由大白统一有序的收集保存。

  名词解释:

  学生:所有学生都是一个大文件,每个学生都是数据行中的一行。宿舍:硬盘停车场:内存中的核酸采集:数据处理样本:处理后的数据:程序。

  

目录

  当学生人数特别少的时候,可以考虑把所有学生叫到停车场等待,然后依次采集核酸。

  方法1:简单情况

  此时的程序可以模拟为:

  导入时间

  从输入导入列表

  def pick _ all _ students(dorm : str)-List[str]:

  用open(dorm, rt ,encoding=utf8 )作为fin:

  学生=fin.readlines()

  返校学生

  def pick _ sample(student : str)-str :

  时间.睡眠(0.01)

  sample=f“{ student . strip()}”的示例

  返回样品

  定义过程(dorm: str,sample _ store room : str)-none :

  用open(样品库, wt ,编码=utf8 )作为fout:

  学生=挑选所有学生(宿舍)

  对于学生:

  sample=pick_sample(学生)

  fout . write(f“{ sample } \ n )

  fout.flush()

  if __name__==__main__:

  流程(

  学生姓名. txt ,

   sample _ storeroom.txt

  )

  注意,第19行,大白把所有同学一次性叫到停车场。这种做法在学生少的时候做的很快,但是学生多了停车场容纳不下怎么办?

  

学生数量特别少的情况

  方法二:边读边加工。

  一般来说,由于车位有限,我们不会一次性把所有学生叫到停车场,而是一个一个处理,这样可以节省内存空间。

  导入时间

  通过键入导入迭代器

  def pick _ one _ student(dorm : str)-迭代器[str]:

  用open(dorm, rt ,encoding=utf8 )作为fin:

  对于fin:中的学生

  屈服学生

  def pick _ sample(student : str)-str :

  时间.睡眠(0.01)

  sample=f“{ student . strip()}”的示例

  返回样品

  定义过程(dorm: str,sample _ store room : str)-none :

  wi

  th open(sample_storeroom, "wt", encoding="utf8") as fout:

   for student in pick_one_student(dorm):

   sample = pick_sample(student)

   fout.write(f"{sample}\n")

   fout.flush()

  if __name__ == "__main__":

   process(

   "student_names.txt",

   "sample_storeroom.txt"

   )

  

  这里pick_one_student函数中的返回值是用yield返回的,一次只会返回一名同学。

  不过,这种做法虽然确保了停车场不会满员,但是这种做法在人数特别多的时候就不再适合了。虽然可以保证完成任务,但由于每次只能采集一个同学,程序的执行并不高。特别是当你的CPU有多个核时,会浪费机器性能,出现一核有难,其它围观的现象。

  

  

  

怎么加快执行效率?

  大家可能也已经注意到了,刚刚我们的场景中,不论采用哪种方法,都只有一名大白在工作。那我们能不能加派人手,从而提高效率呢?

  答案当然是可行的。我们现在先考虑增加两名大白,使得一名大白专注于叫号,安排学生进入停车场,另外一名大白专注于采集核酸,最后一名大白用于存储核酸样本。

  

  方法三

  

import time

  from multiprocessing import Queue, Process

  from typing import Iterator

  def pick_student(stu_queue: Queue, dorm: str) -> Iterator[str]:

   print("pick_student: started")

   picked_num = 0

   with open(dorm, "rt", encoding="utf8") as fin:

   for student in fin:

   stu_queue.put(student)

   picked_num += 1

   if picked_num % 500 == 0:

   print(f"pick_student: {picked_num}")

   # end signal

   stu_queue.put(None)

   print("pick_student: finished")

  def pick_sample(student: str) -> str:

   time.sleep(0.01)

   sample = f"{student.strip()}s sample"

   return sample

  def process(stu_queue: Queue, store_queue: Queue) -> None:

   print("process: started")

   process_num = 0

   while True:

   student = stu_queue.get()

   if student is not None:

   sample = pick_sample(student)

   store_queue.put(sample)

   process_num += 1

   if process_num % 500 == 0:

   print(f"process: {process_num}")

   else:

   break

   # end signal

   store_queue.put(None)

   print("process: finished")

  def store_sample(store_queue: Queue, sample_storeroom: str) -> None:

   print("store_sample: started")

   store_num = 0

   with open(sample_storeroom, "wt", encoding="utf8") as fout:

   while True:

   sample = store_queue.get()

   if sample is not None:

   fout.write(f"{sample}\n")

   fout.flush()

   store_num += 1

   if store_num % 500 == 0:

   print(f"store_sample: {store_num}")

   else:

   break

   print("store_sample: finished")

  if __name__ == "__main__":

   dorm = "student_names.txt"

   sample_storeroom = "sample_storeroom.txt"

   stu_queue = Queue()

   store_queue = Queue()

   store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)

   store_p.start()

   process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)

   process_p.start()

   read_p = Process(target=pick_student, args=(stu_queue, dorm), daemon=True)

   read_p.start()

   store_p.join()

  

  这份代码中,我们引入了多进程的思路,将每个大白看作一个进程,并使用了队列Queue作为进程间通信的媒介。stu_queue表示学生叫号进停车场的队列,store_queue表示已经采集过的待存储核酸样本的队列。

  此外,为了控制进程的停止,我们在pick_student和 process函数的最后都向各自队列中添加了None作为结束标志符。

  假设有1w名学生(student_names.txt文件有1w行),经过测试后发现上述方法的时间如下:

  

  • 方法一:1m40.716s
  • 方法二:1m40.717s
  • 方法三:1m41.097s

  咦?不是做了分工吗?怎么速度还变慢了?经笔者观察,这是因为叫号的大白速度太快了(文件读取速度快)通常是TA已经齐活了,另外俩人还在吭哧吭哧干活呢,体现不出来分工的优势。如果这个时候我们对法二和法三的叫号做延时操作,每个学生叫号之后停滞10ms再叫下一位学生,则方法三的处理时间几乎不变,而方法二的时间则会延长至3m21.345s。

  

  

怎么加快处理速度?

  上面提到,大白采核酸的时间较长,往往上一个人的核酸还没采完,下一个人就已经在后面等着了。我们能不能提高核酸采集这个动作(数据处理)的速度呢?其实一名大白执行一次核酸采集的时间我们几乎无法再缩短了,但是我们可以通过增加人手的方式,来达到这个目的。就像去银行办业务,如果开放的窗口越多,那么每个人等待的时间就会越短。这里我们也采取类似的策略,增加核酸采集的窗口。

  

  

import time

  from multiprocessing import Queue, Process, cpu_count

  from typing import Iterator

  def pick_student(stu_queue: Queue, dorm: str, num_workers: int) -> Iterator[str]:

   print("pick_student: started")

   picked_num = 0

   with open(dorm, "rt", encoding="utf8") as fin:

   for student in fin:

   stu_queue.put(student)

   picked_num += 1

   if picked_num % 500 == 0:

   print(f"pick_student: {picked_num}")

   # end signal

   for _ in range(num_workers):

   stu_queue.put(None)

   print("pick_student: finished")

  def pick_sample(student: str) -> str:

   time.sleep(0.01)

   sample = f"{student.strip()}s sample"

   return sample

  def process(stu_queue: Queue, store_queue: Queue) -> None:

   print("process: started")

   process_num = 0

   while True:

   student = stu_queue.get()

   if student is not None:

   sample = pick_sample(student)

   store_queue.put(sample)

   process_num += 1

   if process_num % 500 == 0:

   print(f"process: {process_num}")

   else:

   break

   print("process: finished")

  def store_sample(store_queue: Queue, sample_storeroom: str) -> None:

   print("store_sample: started")

   store_num = 0

   with open(sample_storeroom, "wt", encoding="utf8") as fout:

   while True:

   sample = store_queue.get()

   if sample is not None:

   fout.write(f"{sample}\n")

   fout.flush()

   store_num += 1

   if store_num % 500 == 0:

   print(f"store_sample: {store_num}")

   else:

   break

   print("store_sample: finished")

  if __name__ == "__main__":

   dorm = "student_names.txt"

   sample_storeroom = "sample_storeroom.txt"

   num_process = max(1, cpu_count() - 1)

   maxsize = 10 * num_process

   stu_queue = Queue(maxsize=maxsize)

   store_queue = Queue(maxsize=maxsize)

   store_p = Process(target=store_sample, args=(store_queue, sample_storeroom), daemon=True)

   store_p.start()

   process_workers = []

   for _ in range(num_process):

   process_p = Process(target=process, args=(stu_queue, store_queue), daemon=True)

   process_p.start()

   process_workers.append(process_p)

   read_p = Process(target=pick_student, args=(stu_queue, dorm, num_process), daemon=True)

   read_p.start()

   for worker in process_workers:

   worker.join()

   # end signal

   store_queue.put(None)

   store_p.join()

  总耗时 0m4.160s !我们来具体看看其中的细节部分:

  首先我们将CPU核数 - 3作为采核酸的大白数量。这里减3是为其它工作进程保留了一些资源,你也可以根据自己的具体情况做调整

  这次我们在 Queue中增加了 maxsize参数,这个参数是限制队列的最大长度,这个参数通常与你的实际内存情况有关。如果数据特别多时要考虑做些调整。这里我采用10倍的工作进程数目作为队列的长度

  注意这里pick_student函数中要为每个后续的工作进程都添加一个结束标志,因此最后会有个for循环

  我们把之前放在process函数中的结束标志提取出来,放在了最外侧,使得所有工作进程均结束之后再关闭最后的store_p进程

  

  

结语

  总结来说,如果你的数据集特别小,用法一;通常情况下用法二;数据集特别大时用法四。

  以上就是Python读取文件的四种方式的实例详解的详细内容,更多关于Python读取文件的资料请关注盛行IT软件开发工作室其它相关文章!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: