How to count word frequencies in a huge file concurrently?Streaming JSON Objects From a Large Compressed FileHow to use multiprocess in csv.DictReader?How do I check whether a file exists without exceptions?How do I copy a file in Python?How to get line count cheaply in Python?How can I count the occurrences of a list item?How do I list all files of a directory?How to read a file line-by-line into a list?How do you append to a file in Python?How to read a text file into a string variable and strip newlines?How do I write JSON data to a file?How to prettyprint a JSON file?

What defenses are there against being summoned by the Gate spell?

Is it tax fraud for an individual to declare non-taxable revenue as taxable income? (US tax laws)

Important Resources for Dark Age Civilizations?

tikz convert color string to hex value

What is a clear way to write a bar that has an extra beat?

How much of data wrangling is a data scientist's job?

Is it unprofessional to ask if a job posting on GlassDoor is real?

How much RAM could one put in a typical 80386 setup?

A case of the sniffles

Client team has low performances and low technical skills: we always fix their work and now they stop collaborate with us. How to solve?

Why doesn't H₄O²⁺ exist?

Can a vampire attack twice with their claws using Multiattack?

Are astronomers waiting to see something in an image from a gravitational lens that they've already seen in an adjacent image?

RSA: Danger of using p to create q

Why is Minecraft giving an OpenGL error?

Accidentally leaked the solution to an assignment, what to do now? (I'm the prof)

Why do I get two different answers for this counting problem?

What's the output of a record needle playing an out-of-speed record

How does one intimidate enemies without having the capacity for violence?

Rock identification in KY

Book with a girl whose grandma is a phoenix, cover depicts the emerald/green-eyed blonde girl

A newer friend of my brother's gave him a load of baseball cards that are supposedly extremely valuable. Is this a scam?

Why does Kotter return in Welcome Back Kotter?

How can I prevent hyper evolved versions of regular creatures from wiping out their cousins?



How to count word frequencies in a huge file concurrently?


Streaming JSON Objects From a Large Compressed FileHow to use multiprocess in csv.DictReader?How do I check whether a file exists without exceptions?How do I copy a file in Python?How to get line count cheaply in Python?How can I count the occurrences of a list item?How do I list all files of a directory?How to read a file line-by-line into a list?How do you append to a file in Python?How to read a text file into a string variable and strip newlines?How do I write JSON data to a file?How to prettyprint a JSON file?






.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;








2















I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.



I have a single thread script with collections.Counter and gzip.open, it takes hours to finish.



Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counters to get the word occurrence.



However, I cannot find an example for ProcessPoolExecutor to send a queue to Executor, they just map single item from a list.
There are only single threaded examples for asyncio.Queue.



  • It is a huge file, so I cannot read the whole file and get the list before counting, thus I cannot use concurrent.futures.Executor.map. But all examples I read use a fixed list as start.


  • The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the map can merge Counters, so I cannot use chunksize>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and use chunksize=1000 to reduce fork times.


Would you write an example for me ?



I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.




My real case is for a more specific file format:



chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54


I need to count each histogram for single columns form column 3 to 8.
So I take word frequencies as an easier example.



My code is:



#!/usr/bin/env pypy3
import sys

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

def main():
import math

if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0

inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass

def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1



csv.DictReader takes most time.



cProfile




My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.



So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.



Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor with the Queue class ?










share|improve this question



















  • 1





    I know you lightly touched on how you're doing this but can you include the code you're using currenty?

    – Jab
    Mar 9 at 1:09












  • I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for xargs and uniq -c, with maybe some awk scripting to glue it all together.

    – Daniel Pryden
    Mar 9 at 1:12











  • Have you looked into using io.BufferedReader? As explained in Reading & Writing GZIP Files Faster in Python

    – Jab
    Mar 9 at 1:14












  • You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a mmap instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.

    – martineau
    Mar 9 at 2:09











  • I included my code now.

    – Galaxy
    Mar 9 at 3:34

















2















I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.



I have a single thread script with collections.Counter and gzip.open, it takes hours to finish.



Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counters to get the word occurrence.



However, I cannot find an example for ProcessPoolExecutor to send a queue to Executor, they just map single item from a list.
There are only single threaded examples for asyncio.Queue.



  • It is a huge file, so I cannot read the whole file and get the list before counting, thus I cannot use concurrent.futures.Executor.map. But all examples I read use a fixed list as start.


  • The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the map can merge Counters, so I cannot use chunksize>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and use chunksize=1000 to reduce fork times.


Would you write an example for me ?



I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.




My real case is for a more specific file format:



chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54


I need to count each histogram for single columns form column 3 to 8.
So I take word frequencies as an easier example.



My code is:



#!/usr/bin/env pypy3
import sys

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

def main():
import math

if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0

inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass

def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1



csv.DictReader takes most time.



cProfile




My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.



So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.



Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor with the Queue class ?










share|improve this question



















  • 1





    I know you lightly touched on how you're doing this but can you include the code you're using currenty?

    – Jab
    Mar 9 at 1:09












  • I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for xargs and uniq -c, with maybe some awk scripting to glue it all together.

    – Daniel Pryden
    Mar 9 at 1:12











  • Have you looked into using io.BufferedReader? As explained in Reading & Writing GZIP Files Faster in Python

    – Jab
    Mar 9 at 1:14












  • You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a mmap instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.

    – martineau
    Mar 9 at 2:09











  • I included my code now.

    – Galaxy
    Mar 9 at 3:34













2












2








2


1






I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.



I have a single thread script with collections.Counter and gzip.open, it takes hours to finish.



Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counters to get the word occurrence.



However, I cannot find an example for ProcessPoolExecutor to send a queue to Executor, they just map single item from a list.
There are only single threaded examples for asyncio.Queue.



  • It is a huge file, so I cannot read the whole file and get the list before counting, thus I cannot use concurrent.futures.Executor.map. But all examples I read use a fixed list as start.


  • The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the map can merge Counters, so I cannot use chunksize>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and use chunksize=1000 to reduce fork times.


Would you write an example for me ?



I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.




My real case is for a more specific file format:



chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54


I need to count each histogram for single columns form column 3 to 8.
So I take word frequencies as an easier example.



My code is:



#!/usr/bin/env pypy3
import sys

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

def main():
import math

if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0

inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass

def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1



csv.DictReader takes most time.



cProfile




My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.



So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.



Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor with the Queue class ?










share|improve this question
















I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.



I have a single thread script with collections.Counter and gzip.open, it takes hours to finish.



Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counters to get the word occurrence.



However, I cannot find an example for ProcessPoolExecutor to send a queue to Executor, they just map single item from a list.
There are only single threaded examples for asyncio.Queue.



  • It is a huge file, so I cannot read the whole file and get the list before counting, thus I cannot use concurrent.futures.Executor.map. But all examples I read use a fixed list as start.


  • The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the map can merge Counters, so I cannot use chunksize>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and use chunksize=1000 to reduce fork times.


Would you write an example for me ?



I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.




My real case is for a more specific file format:



chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54


I need to count each histogram for single columns form column 3 to 8.
So I take word frequencies as an easier example.



My code is:



#!/usr/bin/env pypy3
import sys

SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

def main():
import math

if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0

inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass

def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1



csv.DictReader takes most time.



cProfile




My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.



So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.



Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor with the Queue class ?







python parallel-processing queue process-pool






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Mar 10 at 14:33







Galaxy

















asked Mar 9 at 1:06









GalaxyGalaxy

6251620




6251620







  • 1





    I know you lightly touched on how you're doing this but can you include the code you're using currenty?

    – Jab
    Mar 9 at 1:09












  • I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for xargs and uniq -c, with maybe some awk scripting to glue it all together.

    – Daniel Pryden
    Mar 9 at 1:12











  • Have you looked into using io.BufferedReader? As explained in Reading & Writing GZIP Files Faster in Python

    – Jab
    Mar 9 at 1:14












  • You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a mmap instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.

    – martineau
    Mar 9 at 2:09











  • I included my code now.

    – Galaxy
    Mar 9 at 3:34












  • 1





    I know you lightly touched on how you're doing this but can you include the code you're using currenty?

    – Jab
    Mar 9 at 1:09












  • I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for xargs and uniq -c, with maybe some awk scripting to glue it all together.

    – Daniel Pryden
    Mar 9 at 1:12











  • Have you looked into using io.BufferedReader? As explained in Reading & Writing GZIP Files Faster in Python

    – Jab
    Mar 9 at 1:14












  • You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a mmap instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.

    – martineau
    Mar 9 at 2:09











  • I included my code now.

    – Galaxy
    Mar 9 at 3:34







1




1





I know you lightly touched on how you're doing this but can you include the code you're using currenty?

– Jab
Mar 9 at 1:09






I know you lightly touched on how you're doing this but can you include the code you're using currenty?

– Jab
Mar 9 at 1:09














I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for xargs and uniq -c, with maybe some awk scripting to glue it all together.

– Daniel Pryden
Mar 9 at 1:12





I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for xargs and uniq -c, with maybe some awk scripting to glue it all together.

– Daniel Pryden
Mar 9 at 1:12













Have you looked into using io.BufferedReader? As explained in Reading & Writing GZIP Files Faster in Python

– Jab
Mar 9 at 1:14






Have you looked into using io.BufferedReader? As explained in Reading & Writing GZIP Files Faster in Python

– Jab
Mar 9 at 1:14














You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a mmap instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.

– martineau
Mar 9 at 2:09





You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a mmap instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.

– martineau
Mar 9 at 2:09













I included my code now.

– Galaxy
Mar 9 at 3:34





I included my code now.

– Galaxy
Mar 9 at 3:34












5 Answers
5






active

oldest

votes


















0














I've never tested this code, but should work.



The first thing is to check the number of lines



f =('myfile.txt')
def file_len(f):
with open(f) as f:
for i, l in enumerate(f):
pass
return i + 1
num_lines = file_len(f)


split the data in n partitions



n = threads (8 for example)
split_size = num_lines//n if num_lines//n > 0 else 1
parts = [x for x in range(0, num_lines, split_size)]


And now start the jobs:



from multiprocessing import Process
import linecache
jobs = []

for part in range(len(parts)):
p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
jobs.append(p)
p.start()

for p in jobs:
p.join()


An example of the function



def function_here(your_file_name, line_number, split_size):

for current_line in range(line_number, (line_number+split_size)+1):
print( linecache.getline(your_file_name, current_line))


Still, you will need to check the number of lines before doing any operation






share|improve this answer

























  • I am reading a gzip file and I prefer not to unzip it.

    – Galaxy
    Mar 9 at 3:40











  • I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?

    – Galaxy
    Mar 9 at 4:58


















0














A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.






share|improve this answer






























    0














    The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
    Finally merge the counters.



    from itertools import islice
    from multiprocessing import Pool
    from collections import Counter
    import os

    NUM_OF_LINES = 3
    INPUT_FILE = 'huge.txt'
    POOL_SIZE = 10


    def slice_huge_file():
    cnt = 0
    with open(INPUT_FILE) as f:
    while True:
    next_n_lines = list(islice(f, NUM_OF_LINES))
    cnt += 1
    if not next_n_lines:
    break
    with open('sub_huge_.txt'.format(cnt), 'w') as out:
    out.writelines(next_n_lines)


    def count_file_words(input_file):
    with open(input_file, 'r') as f:
    return Counter([w.strip() for w in f.readlines()])


    if __name__ == '__main__':
    slice_huge_file()
    pool = Pool(POOL_SIZE)
    sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
    results = pool.map(count_file_words, sub_files)
    final_counter = Counter()
    for counter in results:
    final_counter += counter
    print(final_counter)





    share|improve this answer






























      0














      just some pseudocode:



      from concurrent.futures import ProcessPoolExecutor
      from multiprocessing import Manager
      import traceback


      WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
      QUEUE_SIZE = 100 # 10 times to your pool size is good enough


      def main():
      with Manager() as manager:
      q = manager.Queue(QUEUE_SIZE)

      # init worker pool
      executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
      workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]

      # start producer
      run_producer(q)

      # wait to done
      for f in workers_pool:
      try:
      f.result()
      except Exception:
      traceback.print_exc()


      def run_producer(q):
      try:
      with open("your file path") as fp:
      for line in fp:
      q.put(line)
      except Exception:
      traceback.print_exc()
      finally:
      q.put(None)



      def worker(i, q):
      while 1:
      line = q.get()
      if line is None:
      print(f'worker i is done')
      q.put(None)
      return

      # do something with this line
      # ...





      share|improve this answer

























      • How to get the result after worker is done ? I got many <Future at 0x1078d3780 state=running> but no results.

        – Galaxy
        Mar 14 at 3:23


















      0














      I have learned the multiprocessing lib on weekend.



      The stop on Ctrl+C and write current result function is still not working.



      The main function is fine now.



      #!/usr/bin/env pypy3
      import sys
      from collections import Counter
      from multiprocessing import Pool, Process, Manager, current_process, freeze_support

      SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

      ChunkSize = 1024 * 128
      verbose = 0
      Nworkers = 16

      def main():
      import math

      if len(sys.argv) < 3 :
      print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
      exit(0)
      try:
      verbose = int(sys.argv[3])
      except: # `except IndexError:` and `except ValueError:`
      verbose = 0

      inDepthFile = sys.argv[1]
      outFile = sys.argv[2]
      print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
      RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
      for k in SamplesList:
      cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
      cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
      cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
      tsvout = open(outFile, 'wt')
      print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
      #RecordCntLength = len(str(RecordCnt))
      print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
      for depth in range(0,MaxDepth+1):
      #print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
      #print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
      print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
      #pass
      #print('#MaxDepth='.format(MaxDepth),file=tsvout)
      tsvout.close()
      pass

      def CallStat(inDepthFile):
      import gzip
      import itertools
      RecordCnt = 0
      MaxDepth = 0
      cDepthCnt = key:Counter() for key in SamplesList
      cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
      #lines_queue = Queue()
      manager = Manager()
      lines_queue = manager.Queue()
      stater_pool = Pool(Nworkers)
      TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
      #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
      #MapResult = stater_pool.map_async(iStator,TASKS,1)
      AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
      try:
      with gzip.open(inDepthFile, 'rt') as tsvfin:
      while True:
      lines = tsvfin.readlines(ChunkSize)
      lines_queue.put(lines)
      if not lines:
      for i in range(Nworkers):
      lines_queue.put(b'nn')
      break
      except KeyboardInterrupt:
      print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
      for i in range(Nworkers):
      lines_queue.put(b'nn')
      pass
      #for results in ApplyResult:
      #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
      #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
      for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
      RecordCnt += iRecordCnt
      if iMaxDepth > MaxDepth:
      MaxDepth = iMaxDepth
      for k in SamplesList:
      cDepthCnt[k].update(icDepthCnt[k])
      cDepthStat[k][0] += icDepthStat[k][0]
      cDepthStat[k][1] += icDepthStat[k][1]
      return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

      #def iStator(inQueue,inSamplesList):
      def iStator(args):
      (inQueue,inSamplesList) = args
      import csv
      # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
      cDepthCnt = key:Counter() for key in inSamplesList
      cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
      RecordCnt = 0
      MaxDepth = 0
      for lines in iter(inQueue.get, b'nn'):
      try:
      tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
      for row in tsvin:
      #print(', '.join(row[col] for col in inSamplesList))
      RecordCnt += 1
      for k in inSamplesList:
      theValue = int(row[k])
      if theValue > MaxDepth:
      MaxDepth = theValue
      #DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
      #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
      cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
      cDepthStat[k][0] += theValue
      cDepthStat[k][1] += theValue * theValue
      #print(MaxDepth,DepthCnt)
      except KeyboardInterrupt:
      print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
      pass
      #print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
      return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

      if __name__ == "__main__":
      main() # time python3 ./samdepthplot.py t.tsv.gz 1





      share|improve this answer























        Your Answer






        StackExchange.ifUsing("editor", function ()
        StackExchange.using("externalEditor", function ()
        StackExchange.using("snippets", function ()
        StackExchange.snippets.init();
        );
        );
        , "code-snippets");

        StackExchange.ready(function()
        var channelOptions =
        tags: "".split(" "),
        id: "1"
        ;
        initTagRenderer("".split(" "), "".split(" "), channelOptions);

        StackExchange.using("externalEditor", function()
        // Have to fire editor after snippets, if snippets enabled
        if (StackExchange.settings.snippets.snippetsEnabled)
        StackExchange.using("snippets", function()
        createEditor();
        );

        else
        createEditor();

        );

        function createEditor()
        StackExchange.prepareEditor(
        heartbeatType: 'answer',
        autoActivateHeartbeat: false,
        convertImagesToLinks: true,
        noModals: true,
        showLowRepImageUploadWarning: true,
        reputationToPostImages: 10,
        bindNavPrevention: true,
        postfix: "",
        imageUploader:
        brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
        contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
        allowUrls: true
        ,
        onDemand: true,
        discardSelector: ".discard-answer"
        ,immediatelyShowMarkdownHelp:true
        );



        );













        draft saved

        draft discarded


















        StackExchange.ready(
        function ()
        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55073002%2fhow-to-count-word-frequencies-in-a-huge-file-concurrently%23new-answer', 'question_page');

        );

        Post as a guest















        Required, but never shown

























        5 Answers
        5






        active

        oldest

        votes








        5 Answers
        5






        active

        oldest

        votes









        active

        oldest

        votes






        active

        oldest

        votes









        0














        I've never tested this code, but should work.



        The first thing is to check the number of lines



        f =('myfile.txt')
        def file_len(f):
        with open(f) as f:
        for i, l in enumerate(f):
        pass
        return i + 1
        num_lines = file_len(f)


        split the data in n partitions



        n = threads (8 for example)
        split_size = num_lines//n if num_lines//n > 0 else 1
        parts = [x for x in range(0, num_lines, split_size)]


        And now start the jobs:



        from multiprocessing import Process
        import linecache
        jobs = []

        for part in range(len(parts)):
        p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
        jobs.append(p)
        p.start()

        for p in jobs:
        p.join()


        An example of the function



        def function_here(your_file_name, line_number, split_size):

        for current_line in range(line_number, (line_number+split_size)+1):
        print( linecache.getline(your_file_name, current_line))


        Still, you will need to check the number of lines before doing any operation






        share|improve this answer

























        • I am reading a gzip file and I prefer not to unzip it.

          – Galaxy
          Mar 9 at 3:40











        • I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?

          – Galaxy
          Mar 9 at 4:58















        0














        I've never tested this code, but should work.



        The first thing is to check the number of lines



        f =('myfile.txt')
        def file_len(f):
        with open(f) as f:
        for i, l in enumerate(f):
        pass
        return i + 1
        num_lines = file_len(f)


        split the data in n partitions



        n = threads (8 for example)
        split_size = num_lines//n if num_lines//n > 0 else 1
        parts = [x for x in range(0, num_lines, split_size)]


        And now start the jobs:



        from multiprocessing import Process
        import linecache
        jobs = []

        for part in range(len(parts)):
        p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
        jobs.append(p)
        p.start()

        for p in jobs:
        p.join()


        An example of the function



        def function_here(your_file_name, line_number, split_size):

        for current_line in range(line_number, (line_number+split_size)+1):
        print( linecache.getline(your_file_name, current_line))


        Still, you will need to check the number of lines before doing any operation






        share|improve this answer

























        • I am reading a gzip file and I prefer not to unzip it.

          – Galaxy
          Mar 9 at 3:40











        • I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?

          – Galaxy
          Mar 9 at 4:58













        0












        0








        0







        I've never tested this code, but should work.



        The first thing is to check the number of lines



        f =('myfile.txt')
        def file_len(f):
        with open(f) as f:
        for i, l in enumerate(f):
        pass
        return i + 1
        num_lines = file_len(f)


        split the data in n partitions



        n = threads (8 for example)
        split_size = num_lines//n if num_lines//n > 0 else 1
        parts = [x for x in range(0, num_lines, split_size)]


        And now start the jobs:



        from multiprocessing import Process
        import linecache
        jobs = []

        for part in range(len(parts)):
        p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
        jobs.append(p)
        p.start()

        for p in jobs:
        p.join()


        An example of the function



        def function_here(your_file_name, line_number, split_size):

        for current_line in range(line_number, (line_number+split_size)+1):
        print( linecache.getline(your_file_name, current_line))


        Still, you will need to check the number of lines before doing any operation






        share|improve this answer















        I've never tested this code, but should work.



        The first thing is to check the number of lines



        f =('myfile.txt')
        def file_len(f):
        with open(f) as f:
        for i, l in enumerate(f):
        pass
        return i + 1
        num_lines = file_len(f)


        split the data in n partitions



        n = threads (8 for example)
        split_size = num_lines//n if num_lines//n > 0 else 1
        parts = [x for x in range(0, num_lines, split_size)]


        And now start the jobs:



        from multiprocessing import Process
        import linecache
        jobs = []

        for part in range(len(parts)):
        p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
        jobs.append(p)
        p.start()

        for p in jobs:
        p.join()


        An example of the function



        def function_here(your_file_name, line_number, split_size):

        for current_line in range(line_number, (line_number+split_size)+1):
        print( linecache.getline(your_file_name, current_line))


        Still, you will need to check the number of lines before doing any operation







        share|improve this answer














        share|improve this answer



        share|improve this answer








        edited Mar 9 at 2:07

























        answered Mar 9 at 1:53









        kaihamikaihami

        312411




        312411












        • I am reading a gzip file and I prefer not to unzip it.

          – Galaxy
          Mar 9 at 3:40











        • I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?

          – Galaxy
          Mar 9 at 4:58

















        • I am reading a gzip file and I prefer not to unzip it.

          – Galaxy
          Mar 9 at 3:40











        • I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?

          – Galaxy
          Mar 9 at 4:58
















        I am reading a gzip file and I prefer not to unzip it.

        – Galaxy
        Mar 9 at 3:40





        I am reading a gzip file and I prefer not to unzip it.

        – Galaxy
        Mar 9 at 3:40













        I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?

        – Galaxy
        Mar 9 at 4:58





        I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?

        – Galaxy
        Mar 9 at 4:58













        0














        A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.






        share|improve this answer



























          0














          A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.






          share|improve this answer

























            0












            0








            0







            A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.






            share|improve this answer













            A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.







            share|improve this answer












            share|improve this answer



            share|improve this answer










            answered Mar 9 at 2:26









            LoMaPhLoMaPh

            3311714




            3311714





















                0














                The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
                Finally merge the counters.



                from itertools import islice
                from multiprocessing import Pool
                from collections import Counter
                import os

                NUM_OF_LINES = 3
                INPUT_FILE = 'huge.txt'
                POOL_SIZE = 10


                def slice_huge_file():
                cnt = 0
                with open(INPUT_FILE) as f:
                while True:
                next_n_lines = list(islice(f, NUM_OF_LINES))
                cnt += 1
                if not next_n_lines:
                break
                with open('sub_huge_.txt'.format(cnt), 'w') as out:
                out.writelines(next_n_lines)


                def count_file_words(input_file):
                with open(input_file, 'r') as f:
                return Counter([w.strip() for w in f.readlines()])


                if __name__ == '__main__':
                slice_huge_file()
                pool = Pool(POOL_SIZE)
                sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
                results = pool.map(count_file_words, sub_files)
                final_counter = Counter()
                for counter in results:
                final_counter += counter
                print(final_counter)





                share|improve this answer



























                  0














                  The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
                  Finally merge the counters.



                  from itertools import islice
                  from multiprocessing import Pool
                  from collections import Counter
                  import os

                  NUM_OF_LINES = 3
                  INPUT_FILE = 'huge.txt'
                  POOL_SIZE = 10


                  def slice_huge_file():
                  cnt = 0
                  with open(INPUT_FILE) as f:
                  while True:
                  next_n_lines = list(islice(f, NUM_OF_LINES))
                  cnt += 1
                  if not next_n_lines:
                  break
                  with open('sub_huge_.txt'.format(cnt), 'w') as out:
                  out.writelines(next_n_lines)


                  def count_file_words(input_file):
                  with open(input_file, 'r') as f:
                  return Counter([w.strip() for w in f.readlines()])


                  if __name__ == '__main__':
                  slice_huge_file()
                  pool = Pool(POOL_SIZE)
                  sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
                  results = pool.map(count_file_words, sub_files)
                  final_counter = Counter()
                  for counter in results:
                  final_counter += counter
                  print(final_counter)





                  share|improve this answer

























                    0












                    0








                    0







                    The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
                    Finally merge the counters.



                    from itertools import islice
                    from multiprocessing import Pool
                    from collections import Counter
                    import os

                    NUM_OF_LINES = 3
                    INPUT_FILE = 'huge.txt'
                    POOL_SIZE = 10


                    def slice_huge_file():
                    cnt = 0
                    with open(INPUT_FILE) as f:
                    while True:
                    next_n_lines = list(islice(f, NUM_OF_LINES))
                    cnt += 1
                    if not next_n_lines:
                    break
                    with open('sub_huge_.txt'.format(cnt), 'w') as out:
                    out.writelines(next_n_lines)


                    def count_file_words(input_file):
                    with open(input_file, 'r') as f:
                    return Counter([w.strip() for w in f.readlines()])


                    if __name__ == '__main__':
                    slice_huge_file()
                    pool = Pool(POOL_SIZE)
                    sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
                    results = pool.map(count_file_words, sub_files)
                    final_counter = Counter()
                    for counter in results:
                    final_counter += counter
                    print(final_counter)





                    share|improve this answer













                    The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
                    Finally merge the counters.



                    from itertools import islice
                    from multiprocessing import Pool
                    from collections import Counter
                    import os

                    NUM_OF_LINES = 3
                    INPUT_FILE = 'huge.txt'
                    POOL_SIZE = 10


                    def slice_huge_file():
                    cnt = 0
                    with open(INPUT_FILE) as f:
                    while True:
                    next_n_lines = list(islice(f, NUM_OF_LINES))
                    cnt += 1
                    if not next_n_lines:
                    break
                    with open('sub_huge_.txt'.format(cnt), 'w') as out:
                    out.writelines(next_n_lines)


                    def count_file_words(input_file):
                    with open(input_file, 'r') as f:
                    return Counter([w.strip() for w in f.readlines()])


                    if __name__ == '__main__':
                    slice_huge_file()
                    pool = Pool(POOL_SIZE)
                    sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
                    results = pool.map(count_file_words, sub_files)
                    final_counter = Counter()
                    for counter in results:
                    final_counter += counter
                    print(final_counter)






                    share|improve this answer












                    share|improve this answer



                    share|improve this answer










                    answered Mar 9 at 16:00









                    baldermanbalderman

                    1,84111419




                    1,84111419





















                        0














                        just some pseudocode:



                        from concurrent.futures import ProcessPoolExecutor
                        from multiprocessing import Manager
                        import traceback


                        WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
                        QUEUE_SIZE = 100 # 10 times to your pool size is good enough


                        def main():
                        with Manager() as manager:
                        q = manager.Queue(QUEUE_SIZE)

                        # init worker pool
                        executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
                        workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]

                        # start producer
                        run_producer(q)

                        # wait to done
                        for f in workers_pool:
                        try:
                        f.result()
                        except Exception:
                        traceback.print_exc()


                        def run_producer(q):
                        try:
                        with open("your file path") as fp:
                        for line in fp:
                        q.put(line)
                        except Exception:
                        traceback.print_exc()
                        finally:
                        q.put(None)



                        def worker(i, q):
                        while 1:
                        line = q.get()
                        if line is None:
                        print(f'worker i is done')
                        q.put(None)
                        return

                        # do something with this line
                        # ...





                        share|improve this answer

























                        • How to get the result after worker is done ? I got many <Future at 0x1078d3780 state=running> but no results.

                          – Galaxy
                          Mar 14 at 3:23















                        0














                        just some pseudocode:



                        from concurrent.futures import ProcessPoolExecutor
                        from multiprocessing import Manager
                        import traceback


                        WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
                        QUEUE_SIZE = 100 # 10 times to your pool size is good enough


                        def main():
                        with Manager() as manager:
                        q = manager.Queue(QUEUE_SIZE)

                        # init worker pool
                        executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
                        workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]

                        # start producer
                        run_producer(q)

                        # wait to done
                        for f in workers_pool:
                        try:
                        f.result()
                        except Exception:
                        traceback.print_exc()


                        def run_producer(q):
                        try:
                        with open("your file path") as fp:
                        for line in fp:
                        q.put(line)
                        except Exception:
                        traceback.print_exc()
                        finally:
                        q.put(None)



                        def worker(i, q):
                        while 1:
                        line = q.get()
                        if line is None:
                        print(f'worker i is done')
                        q.put(None)
                        return

                        # do something with this line
                        # ...





                        share|improve this answer

























                        • How to get the result after worker is done ? I got many <Future at 0x1078d3780 state=running> but no results.

                          – Galaxy
                          Mar 14 at 3:23













                        0












                        0








                        0







                        just some pseudocode:



                        from concurrent.futures import ProcessPoolExecutor
                        from multiprocessing import Manager
                        import traceback


                        WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
                        QUEUE_SIZE = 100 # 10 times to your pool size is good enough


                        def main():
                        with Manager() as manager:
                        q = manager.Queue(QUEUE_SIZE)

                        # init worker pool
                        executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
                        workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]

                        # start producer
                        run_producer(q)

                        # wait to done
                        for f in workers_pool:
                        try:
                        f.result()
                        except Exception:
                        traceback.print_exc()


                        def run_producer(q):
                        try:
                        with open("your file path") as fp:
                        for line in fp:
                        q.put(line)
                        except Exception:
                        traceback.print_exc()
                        finally:
                        q.put(None)



                        def worker(i, q):
                        while 1:
                        line = q.get()
                        if line is None:
                        print(f'worker i is done')
                        q.put(None)
                        return

                        # do something with this line
                        # ...





                        share|improve this answer















                        just some pseudocode:



                        from concurrent.futures import ProcessPoolExecutor
                        from multiprocessing import Manager
                        import traceback


                        WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
                        QUEUE_SIZE = 100 # 10 times to your pool size is good enough


                        def main():
                        with Manager() as manager:
                        q = manager.Queue(QUEUE_SIZE)

                        # init worker pool
                        executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
                        workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]

                        # start producer
                        run_producer(q)

                        # wait to done
                        for f in workers_pool:
                        try:
                        f.result()
                        except Exception:
                        traceback.print_exc()


                        def run_producer(q):
                        try:
                        with open("your file path") as fp:
                        for line in fp:
                        q.put(line)
                        except Exception:
                        traceback.print_exc()
                        finally:
                        q.put(None)



                        def worker(i, q):
                        while 1:
                        line = q.get()
                        if line is None:
                        print(f'worker i is done')
                        q.put(None)
                        return

                        # do something with this line
                        # ...






                        share|improve this answer














                        share|improve this answer



                        share|improve this answer








                        edited Mar 11 at 8:22

























                        answered Mar 11 at 7:17









                        LaiskyLaisky

                        465




                        465












                        • How to get the result after worker is done ? I got many <Future at 0x1078d3780 state=running> but no results.

                          – Galaxy
                          Mar 14 at 3:23

















                        • How to get the result after worker is done ? I got many <Future at 0x1078d3780 state=running> but no results.

                          – Galaxy
                          Mar 14 at 3:23
















                        How to get the result after worker is done ? I got many <Future at 0x1078d3780 state=running> but no results.

                        – Galaxy
                        Mar 14 at 3:23





                        How to get the result after worker is done ? I got many <Future at 0x1078d3780 state=running> but no results.

                        – Galaxy
                        Mar 14 at 3:23











                        0














                        I have learned the multiprocessing lib on weekend.



                        The stop on Ctrl+C and write current result function is still not working.



                        The main function is fine now.



                        #!/usr/bin/env pypy3
                        import sys
                        from collections import Counter
                        from multiprocessing import Pool, Process, Manager, current_process, freeze_support

                        SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

                        ChunkSize = 1024 * 128
                        verbose = 0
                        Nworkers = 16

                        def main():
                        import math

                        if len(sys.argv) < 3 :
                        print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
                        exit(0)
                        try:
                        verbose = int(sys.argv[3])
                        except: # `except IndexError:` and `except ValueError:`
                        verbose = 0

                        inDepthFile = sys.argv[1]
                        outFile = sys.argv[2]
                        print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
                        RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
                        for k in SamplesList:
                        cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
                        cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
                        cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
                        tsvout = open(outFile, 'wt')
                        print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
                        #RecordCntLength = len(str(RecordCnt))
                        print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
                        for depth in range(0,MaxDepth+1):
                        #print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
                        #print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
                        print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
                        #pass
                        #print('#MaxDepth='.format(MaxDepth),file=tsvout)
                        tsvout.close()
                        pass

                        def CallStat(inDepthFile):
                        import gzip
                        import itertools
                        RecordCnt = 0
                        MaxDepth = 0
                        cDepthCnt = key:Counter() for key in SamplesList
                        cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
                        #lines_queue = Queue()
                        manager = Manager()
                        lines_queue = manager.Queue()
                        stater_pool = Pool(Nworkers)
                        TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
                        #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
                        #MapResult = stater_pool.map_async(iStator,TASKS,1)
                        AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
                        try:
                        with gzip.open(inDepthFile, 'rt') as tsvfin:
                        while True:
                        lines = tsvfin.readlines(ChunkSize)
                        lines_queue.put(lines)
                        if not lines:
                        for i in range(Nworkers):
                        lines_queue.put(b'nn')
                        break
                        except KeyboardInterrupt:
                        print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
                        for i in range(Nworkers):
                        lines_queue.put(b'nn')
                        pass
                        #for results in ApplyResult:
                        #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
                        #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
                        for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
                        RecordCnt += iRecordCnt
                        if iMaxDepth > MaxDepth:
                        MaxDepth = iMaxDepth
                        for k in SamplesList:
                        cDepthCnt[k].update(icDepthCnt[k])
                        cDepthStat[k][0] += icDepthStat[k][0]
                        cDepthStat[k][1] += icDepthStat[k][1]
                        return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

                        #def iStator(inQueue,inSamplesList):
                        def iStator(args):
                        (inQueue,inSamplesList) = args
                        import csv
                        # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
                        cDepthCnt = key:Counter() for key in inSamplesList
                        cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
                        RecordCnt = 0
                        MaxDepth = 0
                        for lines in iter(inQueue.get, b'nn'):
                        try:
                        tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
                        for row in tsvin:
                        #print(', '.join(row[col] for col in inSamplesList))
                        RecordCnt += 1
                        for k in inSamplesList:
                        theValue = int(row[k])
                        if theValue > MaxDepth:
                        MaxDepth = theValue
                        #DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
                        #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
                        cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
                        cDepthStat[k][0] += theValue
                        cDepthStat[k][1] += theValue * theValue
                        #print(MaxDepth,DepthCnt)
                        except KeyboardInterrupt:
                        print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
                        pass
                        #print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
                        return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

                        if __name__ == "__main__":
                        main() # time python3 ./samdepthplot.py t.tsv.gz 1





                        share|improve this answer



























                          0














                          I have learned the multiprocessing lib on weekend.



                          The stop on Ctrl+C and write current result function is still not working.



                          The main function is fine now.



                          #!/usr/bin/env pypy3
                          import sys
                          from collections import Counter
                          from multiprocessing import Pool, Process, Manager, current_process, freeze_support

                          SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

                          ChunkSize = 1024 * 128
                          verbose = 0
                          Nworkers = 16

                          def main():
                          import math

                          if len(sys.argv) < 3 :
                          print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
                          exit(0)
                          try:
                          verbose = int(sys.argv[3])
                          except: # `except IndexError:` and `except ValueError:`
                          verbose = 0

                          inDepthFile = sys.argv[1]
                          outFile = sys.argv[2]
                          print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
                          RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
                          for k in SamplesList:
                          cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
                          cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
                          cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
                          tsvout = open(outFile, 'wt')
                          print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
                          #RecordCntLength = len(str(RecordCnt))
                          print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
                          for depth in range(0,MaxDepth+1):
                          #print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
                          #print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
                          print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
                          #pass
                          #print('#MaxDepth='.format(MaxDepth),file=tsvout)
                          tsvout.close()
                          pass

                          def CallStat(inDepthFile):
                          import gzip
                          import itertools
                          RecordCnt = 0
                          MaxDepth = 0
                          cDepthCnt = key:Counter() for key in SamplesList
                          cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
                          #lines_queue = Queue()
                          manager = Manager()
                          lines_queue = manager.Queue()
                          stater_pool = Pool(Nworkers)
                          TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
                          #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
                          #MapResult = stater_pool.map_async(iStator,TASKS,1)
                          AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
                          try:
                          with gzip.open(inDepthFile, 'rt') as tsvfin:
                          while True:
                          lines = tsvfin.readlines(ChunkSize)
                          lines_queue.put(lines)
                          if not lines:
                          for i in range(Nworkers):
                          lines_queue.put(b'nn')
                          break
                          except KeyboardInterrupt:
                          print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
                          for i in range(Nworkers):
                          lines_queue.put(b'nn')
                          pass
                          #for results in ApplyResult:
                          #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
                          #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
                          for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
                          RecordCnt += iRecordCnt
                          if iMaxDepth > MaxDepth:
                          MaxDepth = iMaxDepth
                          for k in SamplesList:
                          cDepthCnt[k].update(icDepthCnt[k])
                          cDepthStat[k][0] += icDepthStat[k][0]
                          cDepthStat[k][1] += icDepthStat[k][1]
                          return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

                          #def iStator(inQueue,inSamplesList):
                          def iStator(args):
                          (inQueue,inSamplesList) = args
                          import csv
                          # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
                          cDepthCnt = key:Counter() for key in inSamplesList
                          cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
                          RecordCnt = 0
                          MaxDepth = 0
                          for lines in iter(inQueue.get, b'nn'):
                          try:
                          tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
                          for row in tsvin:
                          #print(', '.join(row[col] for col in inSamplesList))
                          RecordCnt += 1
                          for k in inSamplesList:
                          theValue = int(row[k])
                          if theValue > MaxDepth:
                          MaxDepth = theValue
                          #DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
                          #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
                          cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
                          cDepthStat[k][0] += theValue
                          cDepthStat[k][1] += theValue * theValue
                          #print(MaxDepth,DepthCnt)
                          except KeyboardInterrupt:
                          print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
                          pass
                          #print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
                          return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

                          if __name__ == "__main__":
                          main() # time python3 ./samdepthplot.py t.tsv.gz 1





                          share|improve this answer

























                            0












                            0








                            0







                            I have learned the multiprocessing lib on weekend.



                            The stop on Ctrl+C and write current result function is still not working.



                            The main function is fine now.



                            #!/usr/bin/env pypy3
                            import sys
                            from collections import Counter
                            from multiprocessing import Pool, Process, Manager, current_process, freeze_support

                            SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

                            ChunkSize = 1024 * 128
                            verbose = 0
                            Nworkers = 16

                            def main():
                            import math

                            if len(sys.argv) < 3 :
                            print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
                            exit(0)
                            try:
                            verbose = int(sys.argv[3])
                            except: # `except IndexError:` and `except ValueError:`
                            verbose = 0

                            inDepthFile = sys.argv[1]
                            outFile = sys.argv[2]
                            print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
                            RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
                            for k in SamplesList:
                            cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
                            cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
                            cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
                            tsvout = open(outFile, 'wt')
                            print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
                            #RecordCntLength = len(str(RecordCnt))
                            print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
                            for depth in range(0,MaxDepth+1):
                            #print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
                            #print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
                            print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
                            #pass
                            #print('#MaxDepth='.format(MaxDepth),file=tsvout)
                            tsvout.close()
                            pass

                            def CallStat(inDepthFile):
                            import gzip
                            import itertools
                            RecordCnt = 0
                            MaxDepth = 0
                            cDepthCnt = key:Counter() for key in SamplesList
                            cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
                            #lines_queue = Queue()
                            manager = Manager()
                            lines_queue = manager.Queue()
                            stater_pool = Pool(Nworkers)
                            TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
                            #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
                            #MapResult = stater_pool.map_async(iStator,TASKS,1)
                            AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
                            try:
                            with gzip.open(inDepthFile, 'rt') as tsvfin:
                            while True:
                            lines = tsvfin.readlines(ChunkSize)
                            lines_queue.put(lines)
                            if not lines:
                            for i in range(Nworkers):
                            lines_queue.put(b'nn')
                            break
                            except KeyboardInterrupt:
                            print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
                            for i in range(Nworkers):
                            lines_queue.put(b'nn')
                            pass
                            #for results in ApplyResult:
                            #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
                            #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
                            for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
                            RecordCnt += iRecordCnt
                            if iMaxDepth > MaxDepth:
                            MaxDepth = iMaxDepth
                            for k in SamplesList:
                            cDepthCnt[k].update(icDepthCnt[k])
                            cDepthStat[k][0] += icDepthStat[k][0]
                            cDepthStat[k][1] += icDepthStat[k][1]
                            return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

                            #def iStator(inQueue,inSamplesList):
                            def iStator(args):
                            (inQueue,inSamplesList) = args
                            import csv
                            # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
                            cDepthCnt = key:Counter() for key in inSamplesList
                            cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
                            RecordCnt = 0
                            MaxDepth = 0
                            for lines in iter(inQueue.get, b'nn'):
                            try:
                            tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
                            for row in tsvin:
                            #print(', '.join(row[col] for col in inSamplesList))
                            RecordCnt += 1
                            for k in inSamplesList:
                            theValue = int(row[k])
                            if theValue > MaxDepth:
                            MaxDepth = theValue
                            #DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
                            #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
                            cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
                            cDepthStat[k][0] += theValue
                            cDepthStat[k][1] += theValue * theValue
                            #print(MaxDepth,DepthCnt)
                            except KeyboardInterrupt:
                            print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
                            pass
                            #print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
                            return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

                            if __name__ == "__main__":
                            main() # time python3 ./samdepthplot.py t.tsv.gz 1





                            share|improve this answer













                            I have learned the multiprocessing lib on weekend.



                            The stop on Ctrl+C and write current result function is still not working.



                            The main function is fine now.



                            #!/usr/bin/env pypy3
                            import sys
                            from collections import Counter
                            from multiprocessing import Pool, Process, Manager, current_process, freeze_support

                            SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')

                            ChunkSize = 1024 * 128
                            verbose = 0
                            Nworkers = 16

                            def main():
                            import math

                            if len(sys.argv) < 3 :
                            print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
                            exit(0)
                            try:
                            verbose = int(sys.argv[3])
                            except: # `except IndexError:` and `except ValueError:`
                            verbose = 0

                            inDepthFile = sys.argv[1]
                            outFile = sys.argv[2]
                            print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
                            RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
                            for k in SamplesList:
                            cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
                            cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
                            cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
                            tsvout = open(outFile, 'wt')
                            print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
                            #RecordCntLength = len(str(RecordCnt))
                            print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
                            for depth in range(0,MaxDepth+1):
                            #print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
                            #print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
                            print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
                            #pass
                            #print('#MaxDepth='.format(MaxDepth),file=tsvout)
                            tsvout.close()
                            pass

                            def CallStat(inDepthFile):
                            import gzip
                            import itertools
                            RecordCnt = 0
                            MaxDepth = 0
                            cDepthCnt = key:Counter() for key in SamplesList
                            cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
                            #lines_queue = Queue()
                            manager = Manager()
                            lines_queue = manager.Queue()
                            stater_pool = Pool(Nworkers)
                            TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
                            #ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
                            #MapResult = stater_pool.map_async(iStator,TASKS,1)
                            AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
                            try:
                            with gzip.open(inDepthFile, 'rt') as tsvfin:
                            while True:
                            lines = tsvfin.readlines(ChunkSize)
                            lines_queue.put(lines)
                            if not lines:
                            for i in range(Nworkers):
                            lines_queue.put(b'nn')
                            break
                            except KeyboardInterrupt:
                            print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
                            for i in range(Nworkers):
                            lines_queue.put(b'nn')
                            pass
                            #for results in ApplyResult:
                            #(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
                            #for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
                            for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
                            RecordCnt += iRecordCnt
                            if iMaxDepth > MaxDepth:
                            MaxDepth = iMaxDepth
                            for k in SamplesList:
                            cDepthCnt[k].update(icDepthCnt[k])
                            cDepthStat[k][0] += icDepthStat[k][0]
                            cDepthStat[k][1] += icDepthStat[k][1]
                            return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

                            #def iStator(inQueue,inSamplesList):
                            def iStator(args):
                            (inQueue,inSamplesList) = args
                            import csv
                            # Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
                            cDepthCnt = key:Counter() for key in inSamplesList
                            cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
                            RecordCnt = 0
                            MaxDepth = 0
                            for lines in iter(inQueue.get, b'nn'):
                            try:
                            tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
                            for row in tsvin:
                            #print(', '.join(row[col] for col in inSamplesList))
                            RecordCnt += 1
                            for k in inSamplesList:
                            theValue = int(row[k])
                            if theValue > MaxDepth:
                            MaxDepth = theValue
                            #DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
                            #yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
                            cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
                            cDepthStat[k][0] += theValue
                            cDepthStat[k][1] += theValue * theValue
                            #print(MaxDepth,DepthCnt)
                            except KeyboardInterrupt:
                            print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
                            pass
                            #print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
                            return RecordCnt,MaxDepth,cDepthCnt,cDepthStat

                            if __name__ == "__main__":
                            main() # time python3 ./samdepthplot.py t.tsv.gz 1






                            share|improve this answer












                            share|improve this answer



                            share|improve this answer










                            answered Mar 11 at 9:00









                            GalaxyGalaxy

                            6251620




                            6251620



























                                draft saved

                                draft discarded
















































                                Thanks for contributing an answer to Stack Overflow!


                                • Please be sure to answer the question. Provide details and share your research!

                                But avoid


                                • Asking for help, clarification, or responding to other answers.

                                • Making statements based on opinion; back them up with references or personal experience.

                                To learn more, see our tips on writing great answers.




                                draft saved


                                draft discarded














                                StackExchange.ready(
                                function ()
                                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55073002%2fhow-to-count-word-frequencies-in-a-huge-file-concurrently%23new-answer', 'question_page');

                                );

                                Post as a guest















                                Required, but never shown





















































                                Required, but never shown














                                Required, but never shown












                                Required, but never shown







                                Required, but never shown

































                                Required, but never shown














                                Required, but never shown












                                Required, but never shown







                                Required, but never shown







                                Popular posts from this blog

                                Identity Server 4 is not redirecting to Angular app after login2019 Community Moderator ElectionIdentity Server 4 and dockerIdentityserver implicit flow unauthorized_clientIdentityServer Hybrid Flow - Access Token is null after user successful loginIdentity Server to MVC client : Page Redirect After loginLogin with Steam OpenId(oidc-client-js)Identity Server 4+.NET Core 2.0 + IdentityIdentityServer4 post-login redirect not working in Edge browserCall to IdentityServer4 generates System.NullReferenceException: Object reference not set to an instance of an objectIdentityServer4 without HTTPS not workingHow to get Authorization code from identity server without login form

                                2005 Ahvaz unrest Contents Background Causes Casualties Aftermath See also References Navigation menue"At Least 10 Are Killed by Bombs in Iran""Iran"Archived"Arab-Iranians in Iran to make April 15 'Day of Fury'"State of Mind, State of Order: Reactions to Ethnic Unrest in the Islamic Republic of Iran.10.1111/j.1754-9469.2008.00028.x"Iran hangs Arab separatists"Iran Overview from ArchivedConstitution of the Islamic Republic of Iran"Tehran puzzled by forged 'riots' letter""Iran and its minorities: Down in the second class""Iran: Handling Of Ahvaz Unrest Could End With Televised Confessions""Bombings Rock Iran Ahead of Election""Five die in Iran ethnic clashes""Iran: Need for restraint as anniversary of unrest in Khuzestan approaches"Archived"Iranian Sunni protesters killed in clashes with security forces"Archived

                                Can't initialize raids on a new ASUS Prime B360M-A motherboard2019 Community Moderator ElectionSimilar to RAID config yet more like mirroring solution?Can't get motherboard serial numberWhy does the BIOS entry point start with a WBINVD instruction?UEFI performance Asus Maximus V Extreme