Friday, 15 May 2015

Python multiprocessing: parsing, editing, and writing long series of csv files -


I have a very long series of similar CVS files (14 GB perfectly). I need to open each file, change some characters, and write a certain version in a new file. I want to use the processing power of my multicore computer. I tried with mp.Pools and mp.Process / mp.Queue. The pool version works, but the Q view produces this error:

IOError: [Errno 22] Invalid mode ('R') or file name: '& lt; Multiprocessing.queues.Queue object 0x0000000002775A90 & gt; '

This is a simplified version of my pool code:

  Importing OS import pandes as PP import multiprocessing, MP Def Fixer (a_file): Open_file = open (a_file) for each_line in lines = [] Opened_file: lines.append (each_line.Relest ('nuts', 'rational')) open_file.close () df = pd.DataFrame (rows) # Some Punds Magazine Here DF .to_csv (a_file [: - 4] + '_fixed.csv') If __name__ == "__main__": my_path = os.getcwd () my_files = list (o_wath (my_path)) [0] [ 2] # I bus processor = mp.cpu_count () pool = mp.Pool (process = PRO) List of ESR) # files # I set up many processes as processors of your computer. Pool.map (fixer, my_files) and this Q is for the approach:  
  Import imported PDs to PDF importers as MPPF multiprocessing Open_file = open (a_file) for each_line in Open_file: lines .append (each_line.rele ('crazy', 'rational')) open_file.close () df = pd.DataFrame (rows) #some pandas magics here df.to_csv (A_file [: - 4] + '_fixed.csv') If __name__ == "__main__": my_path = os.getcwd () my_files = list (oswalk (my_path)) [0] [2] # i List only the file names in the processor = mp.cpu_count () in my_files Queue = mp.Queue () for each_file: queue.put (each_file) processes = [mp.process (target = fixer), process.start (process.start) for processes: process.join ()  

I appreciate if the process is processed in the core (processor)] for the core in args = (queue,)) you can provide an example to the Qiv version to work. In the second processing phase, before the files are written, I need a processor so that intermediate results can be obtained and some can be calculated. That's why I need queues.

The problem in the queue script is that I was not receiving the next element in the queue, but the fixer The full queue for the ceremony is going on. This problem is resolved by specifying queue.get () in the fixer function:

  Import OS Import Panda PD Import as MP Multi Processing Diff fixer (a_queue): a_file = a_queue.get () rows = [] open_file = open_file for each_line (an_file): lines.append (each_line.rele ('crazy', 'rational')) open_file.close ( ) Df = Pd.DataFrame (lines) # Some Pandas magazines here df.to_csv (a_file [: - 4] + '_fixed.csv') if __name__ == "__main__": my_path = os.getcwd () my_files = list ( OS Walking (my_path)) [0] [2] # I'm getting the list of file names for each file in the processor = mp.cpu_count () queue = mp.Queue () my_files: queue.put (every_file) process = Process for Process Process (Process) in Mp Process (target = Fixer, Args = (Qi,)) for Processors: Processes: process.start () Processes for Process: process.join ()  

No comments:

Post a Comment