How to count word frequencies in a huge file concurrently?Streaming JSON Objects From a Large Compressed FileHow to use multiprocess in csv.DictReader?How do I check whether a file exists without exceptions?How do I copy a file in Python?How to get line count cheaply in Python?How can I count the occurrences of a list item?How do I list all files of a directory?How to read a file line-by-line into a list?How do you append to a file in Python?How to read a text file into a string variable and strip newlines?How do I write JSON data to a file?How to prettyprint a JSON file?
What defenses are there against being summoned by the Gate spell?
Is it tax fraud for an individual to declare non-taxable revenue as taxable income? (US tax laws)
Important Resources for Dark Age Civilizations?
tikz convert color string to hex value
What is a clear way to write a bar that has an extra beat?
How much of data wrangling is a data scientist's job?
Is it unprofessional to ask if a job posting on GlassDoor is real?
How much RAM could one put in a typical 80386 setup?
A case of the sniffles
Client team has low performances and low technical skills: we always fix their work and now they stop collaborate with us. How to solve?
Why doesn't H₄O²⁺ exist?
Can a vampire attack twice with their claws using Multiattack?
Are astronomers waiting to see something in an image from a gravitational lens that they've already seen in an adjacent image?
RSA: Danger of using p to create q
Why is Minecraft giving an OpenGL error?
Accidentally leaked the solution to an assignment, what to do now? (I'm the prof)
Why do I get two different answers for this counting problem?
What's the output of a record needle playing an out-of-speed record
How does one intimidate enemies without having the capacity for violence?
Rock identification in KY
Book with a girl whose grandma is a phoenix, cover depicts the emerald/green-eyed blonde girl
A newer friend of my brother's gave him a load of baseball cards that are supposedly extremely valuable. Is this a scam?
Why does Kotter return in Welcome Back Kotter?
How can I prevent hyper evolved versions of regular creatures from wiping out their cousins?
How to count word frequencies in a huge file concurrently?
Streaming JSON Objects From a Large Compressed FileHow to use multiprocess in csv.DictReader?How do I check whether a file exists without exceptions?How do I copy a file in Python?How to get line count cheaply in Python?How can I count the occurrences of a list item?How do I list all files of a directory?How to read a file line-by-line into a list?How do you append to a file in Python?How to read a text file into a string variable and strip newlines?How do I write JSON data to a file?How to prettyprint a JSON file?
.everyoneloves__top-leaderboard:empty,.everyoneloves__mid-leaderboard:empty,.everyoneloves__bot-mid-leaderboard:empty height:90px;width:728px;box-sizing:border-box;
I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.
I have a single thread script with collections.Counter
and gzip.open
, it takes hours to finish.
Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counter
s to get the word occurrence.
However, I cannot find an example for ProcessPoolExecutor
to send a queue to Executor
, they just map
single item from a list.
There are only single threaded examples for asyncio.Queue
.
It is a huge file, so I cannot read the whole file and get the
list
before counting, thus I cannot useconcurrent.futures.Executor.map
. But all examples I read use a fixed list as start.The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the
map
can mergeCounter
s, so I cannot usechunksize
>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and usechunksize=1000
to reducefork
times.
Would you write an example for me ?
I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.
My real case is for a more specific file format:
chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54
I need to count each histogram for single columns form column 3 to 8.
So I take word frequencies as an easier example.
My code is:
#!/usr/bin/env pypy3
import sys
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass
def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
csv.DictReader
takes most time.
My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.
So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.
Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor
with the Queue
class ?
python parallel-processing queue process-pool
|
show 3 more comments
I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.
I have a single thread script with collections.Counter
and gzip.open
, it takes hours to finish.
Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counter
s to get the word occurrence.
However, I cannot find an example for ProcessPoolExecutor
to send a queue to Executor
, they just map
single item from a list.
There are only single threaded examples for asyncio.Queue
.
It is a huge file, so I cannot read the whole file and get the
list
before counting, thus I cannot useconcurrent.futures.Executor.map
. But all examples I read use a fixed list as start.The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the
map
can mergeCounter
s, so I cannot usechunksize
>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and usechunksize=1000
to reducefork
times.
Would you write an example for me ?
I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.
My real case is for a more specific file format:
chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54
I need to count each histogram for single columns form column 3 to 8.
So I take word frequencies as an easier example.
My code is:
#!/usr/bin/env pypy3
import sys
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass
def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
csv.DictReader
takes most time.
My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.
So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.
Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor
with the Queue
class ?
python parallel-processing queue process-pool
1
I know you lightly touched on how you're doing this but can you include the code you're using currenty?
– Jab
Mar 9 at 1:09
I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit forxargs
anduniq -c
, with maybe someawk
scripting to glue it all together.
– Daniel Pryden
Mar 9 at 1:12
Have you looked into usingio.BufferedReader
? As explained in Reading & Writing GZIP Files Faster in Python
– Jab
Mar 9 at 1:14
You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with ammap
instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.
– martineau
Mar 9 at 2:09
I included my code now.
– Galaxy
Mar 9 at 3:34
|
show 3 more comments
I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.
I have a single thread script with collections.Counter
and gzip.open
, it takes hours to finish.
Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counter
s to get the word occurrence.
However, I cannot find an example for ProcessPoolExecutor
to send a queue to Executor
, they just map
single item from a list.
There are only single threaded examples for asyncio.Queue
.
It is a huge file, so I cannot read the whole file and get the
list
before counting, thus I cannot useconcurrent.futures.Executor.map
. But all examples I read use a fixed list as start.The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the
map
can mergeCounter
s, so I cannot usechunksize
>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and usechunksize=1000
to reducefork
times.
Would you write an example for me ?
I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.
My real case is for a more specific file format:
chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54
I need to count each histogram for single columns form column 3 to 8.
So I take word frequencies as an easier example.
My code is:
#!/usr/bin/env pypy3
import sys
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass
def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
csv.DictReader
takes most time.
My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.
So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.
Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor
with the Queue
class ?
python parallel-processing queue process-pool
I need to count word frequency of a 3GB gzipped plain text file of English sentences, which is about 30 GB when unzipped.
I have a single thread script with collections.Counter
and gzip.open
, it takes hours to finish.
Since reading a file line by line is much faster than split and counting, I am thinking about a producer-consumer flow with a file reader to produce lines and several consumers to do the split and counting, and in the end, merge the Counter
s to get the word occurrence.
However, I cannot find an example for ProcessPoolExecutor
to send a queue to Executor
, they just map
single item from a list.
There are only single threaded examples for asyncio.Queue
.
It is a huge file, so I cannot read the whole file and get the
list
before counting, thus I cannot useconcurrent.futures.Executor.map
. But all examples I read use a fixed list as start.The time to splitting and counting one sentence is comparable to fork a process, so I have to make each consumer process lives longer. I do not think the
map
can mergeCounter
s, so I cannot usechunksize
>1. Thus I have to give the consumer a queue and make them keep counting until the whole file is finished. But most examples only send one item to consumer and usechunksize=1000
to reducefork
times.
Would you write an example for me ?
I hope the code is backward compatible with Python 3.5.3, since PyPy is faster.
My real case is for a more specific file format:
chr1 10011 141 0 157 4 41 50
chr1 10012 146 1 158 4 42 51
chr1 10013 150 0 163 4 43 53
chr1 10014 164 3 167 4 44 54
I need to count each histogram for single columns form column 3 to 8.
So I take word frequencies as an easier example.
My code is:
#!/usr/bin/env pypy3
import sys
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = inStat(inDepthFile,verbose)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
tsvout.close()
pass
def inStat(inDepthFile,verbose):
import gzip
import csv
from collections import Counter
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
with gzip.open(inDepthFile, 'rt') as tsvin:
tsvin = csv.DictReader(tsvin, delimiter='t', fieldnames=('ChrID','Pos')+SamplesList )
try:
for row in tsvin:
RecordCnt += 1
for k in SamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
print('[!]Lines Read:[], MaxDepth is [].'.format(RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
csv.DictReader
takes most time.
My problem is, although gzip reader is fast, csv reader is fast, I need count billions of lines. And csv reader is sure being SLOWER than gzip reader.
So, I need to spread lines to different worker processes of csv reader and do downstream counting separately. It is convenient to use a queue between one producer and many consumers.
Since I am using Python, not C, is there some abstracted wrapper for multiprocessing and queue ? Is this possible to use ProcessPoolExecutor
with the Queue
class ?
python parallel-processing queue process-pool
python parallel-processing queue process-pool
edited Mar 10 at 14:33
Galaxy
asked Mar 9 at 1:06
GalaxyGalaxy
6251620
6251620
1
I know you lightly touched on how you're doing this but can you include the code you're using currenty?
– Jab
Mar 9 at 1:09
I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit forxargs
anduniq -c
, with maybe someawk
scripting to glue it all together.
– Daniel Pryden
Mar 9 at 1:12
Have you looked into usingio.BufferedReader
? As explained in Reading & Writing GZIP Files Faster in Python
– Jab
Mar 9 at 1:14
You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with ammap
instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.
– martineau
Mar 9 at 2:09
I included my code now.
– Galaxy
Mar 9 at 3:34
|
show 3 more comments
1
I know you lightly touched on how you're doing this but can you include the code you're using currenty?
– Jab
Mar 9 at 1:09
I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit forxargs
anduniq -c
, with maybe someawk
scripting to glue it all together.
– Daniel Pryden
Mar 9 at 1:12
Have you looked into usingio.BufferedReader
? As explained in Reading & Writing GZIP Files Faster in Python
– Jab
Mar 9 at 1:14
You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with ammap
instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.
– martineau
Mar 9 at 2:09
I included my code now.
– Galaxy
Mar 9 at 3:34
1
1
I know you lightly touched on how you're doing this but can you include the code you're using currenty?
– Jab
Mar 9 at 1:09
I know you lightly touched on how you're doing this but can you include the code you're using currenty?
– Jab
Mar 9 at 1:09
I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for
xargs
and uniq -c
, with maybe some awk
scripting to glue it all together.– Daniel Pryden
Mar 9 at 1:12
I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for
xargs
and uniq -c
, with maybe some awk
scripting to glue it all together.– Daniel Pryden
Mar 9 at 1:12
Have you looked into using
io.BufferedReader
? As explained in Reading & Writing GZIP Files Faster in Python– Jab
Mar 9 at 1:14
Have you looked into using
io.BufferedReader
? As explained in Reading & Writing GZIP Files Faster in Python– Jab
Mar 9 at 1:14
You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a
mmap
instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.– martineau
Mar 9 at 2:09
You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a
mmap
instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.– martineau
Mar 9 at 2:09
I included my code now.
– Galaxy
Mar 9 at 3:34
I included my code now.
– Galaxy
Mar 9 at 3:34
|
show 3 more comments
5 Answers
5
active
oldest
votes
I've never tested this code, but should work.
The first thing is to check the number of lines
f =('myfile.txt')
def file_len(f):
with open(f) as f:
for i, l in enumerate(f):
pass
return i + 1
num_lines = file_len(f)
split the data in n partitions
n = threads (8 for example)
split_size = num_lines//n if num_lines//n > 0 else 1
parts = [x for x in range(0, num_lines, split_size)]
And now start the jobs:
from multiprocessing import Process
import linecache
jobs = []
for part in range(len(parts)):
p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
jobs.append(p)
p.start()
for p in jobs:
p.join()
An example of the function
def function_here(your_file_name, line_number, split_size):
for current_line in range(line_number, (line_number+split_size)+1):
print( linecache.getline(your_file_name, current_line))
Still, you will need to check the number of lines before doing any operation
I am reading a gzip file and I prefer not to unzip it.
– Galaxy
Mar 9 at 3:40
I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?
– Galaxy
Mar 9 at 4:58
add a comment |
A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce
algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.
add a comment |
The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
Finally merge the counters.
from itertools import islice
from multiprocessing import Pool
from collections import Counter
import os
NUM_OF_LINES = 3
INPUT_FILE = 'huge.txt'
POOL_SIZE = 10
def slice_huge_file():
cnt = 0
with open(INPUT_FILE) as f:
while True:
next_n_lines = list(islice(f, NUM_OF_LINES))
cnt += 1
if not next_n_lines:
break
with open('sub_huge_.txt'.format(cnt), 'w') as out:
out.writelines(next_n_lines)
def count_file_words(input_file):
with open(input_file, 'r') as f:
return Counter([w.strip() for w in f.readlines()])
if __name__ == '__main__':
slice_huge_file()
pool = Pool(POOL_SIZE)
sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
results = pool.map(count_file_words, sub_files)
final_counter = Counter()
for counter in results:
final_counter += counter
print(final_counter)
add a comment |
just some pseudocode:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
import traceback
WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
QUEUE_SIZE = 100 # 10 times to your pool size is good enough
def main():
with Manager() as manager:
q = manager.Queue(QUEUE_SIZE)
# init worker pool
executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]
# start producer
run_producer(q)
# wait to done
for f in workers_pool:
try:
f.result()
except Exception:
traceback.print_exc()
def run_producer(q):
try:
with open("your file path") as fp:
for line in fp:
q.put(line)
except Exception:
traceback.print_exc()
finally:
q.put(None)
def worker(i, q):
while 1:
line = q.get()
if line is None:
print(f'worker i is done')
q.put(None)
return
# do something with this line
# ...
How to get the result after worker is done ? I got many<Future at 0x1078d3780 state=running>
but no results.
– Galaxy
Mar 14 at 3:23
add a comment |
I have learned the multiprocessing lib on weekend.
The stop on Ctrl+C and write current result function is still not working.
The main function is fine now.
#!/usr/bin/env pypy3
import sys
from collections import Counter
from multiprocessing import Pool, Process, Manager, current_process, freeze_support
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
ChunkSize = 1024 * 128
verbose = 0
Nworkers = 16
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
#print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
#print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
#pass
#print('#MaxDepth='.format(MaxDepth),file=tsvout)
tsvout.close()
pass
def CallStat(inDepthFile):
import gzip
import itertools
RecordCnt = 0
MaxDepth = 0
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
#lines_queue = Queue()
manager = Manager()
lines_queue = manager.Queue()
stater_pool = Pool(Nworkers)
TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
#ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
#MapResult = stater_pool.map_async(iStator,TASKS,1)
AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
try:
with gzip.open(inDepthFile, 'rt') as tsvfin:
while True:
lines = tsvfin.readlines(ChunkSize)
lines_queue.put(lines)
if not lines:
for i in range(Nworkers):
lines_queue.put(b'nn')
break
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
for i in range(Nworkers):
lines_queue.put(b'nn')
pass
#for results in ApplyResult:
#(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
#for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
RecordCnt += iRecordCnt
if iMaxDepth > MaxDepth:
MaxDepth = iMaxDepth
for k in SamplesList:
cDepthCnt[k].update(icDepthCnt[k])
cDepthStat[k][0] += icDepthStat[k][0]
cDepthStat[k][1] += icDepthStat[k][1]
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
#def iStator(inQueue,inSamplesList):
def iStator(args):
(inQueue,inSamplesList) = args
import csv
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in inSamplesList
cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
for lines in iter(inQueue.get, b'nn'):
try:
tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
for row in tsvin:
#print(', '.join(row[col] for col in inSamplesList))
RecordCnt += 1
for k in inSamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
#DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
#yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
#print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55073002%2fhow-to-count-word-frequencies-in-a-huge-file-concurrently%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
5 Answers
5
active
oldest
votes
5 Answers
5
active
oldest
votes
active
oldest
votes
active
oldest
votes
I've never tested this code, but should work.
The first thing is to check the number of lines
f =('myfile.txt')
def file_len(f):
with open(f) as f:
for i, l in enumerate(f):
pass
return i + 1
num_lines = file_len(f)
split the data in n partitions
n = threads (8 for example)
split_size = num_lines//n if num_lines//n > 0 else 1
parts = [x for x in range(0, num_lines, split_size)]
And now start the jobs:
from multiprocessing import Process
import linecache
jobs = []
for part in range(len(parts)):
p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
jobs.append(p)
p.start()
for p in jobs:
p.join()
An example of the function
def function_here(your_file_name, line_number, split_size):
for current_line in range(line_number, (line_number+split_size)+1):
print( linecache.getline(your_file_name, current_line))
Still, you will need to check the number of lines before doing any operation
I am reading a gzip file and I prefer not to unzip it.
– Galaxy
Mar 9 at 3:40
I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?
– Galaxy
Mar 9 at 4:58
add a comment |
I've never tested this code, but should work.
The first thing is to check the number of lines
f =('myfile.txt')
def file_len(f):
with open(f) as f:
for i, l in enumerate(f):
pass
return i + 1
num_lines = file_len(f)
split the data in n partitions
n = threads (8 for example)
split_size = num_lines//n if num_lines//n > 0 else 1
parts = [x for x in range(0, num_lines, split_size)]
And now start the jobs:
from multiprocessing import Process
import linecache
jobs = []
for part in range(len(parts)):
p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
jobs.append(p)
p.start()
for p in jobs:
p.join()
An example of the function
def function_here(your_file_name, line_number, split_size):
for current_line in range(line_number, (line_number+split_size)+1):
print( linecache.getline(your_file_name, current_line))
Still, you will need to check the number of lines before doing any operation
I am reading a gzip file and I prefer not to unzip it.
– Galaxy
Mar 9 at 3:40
I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?
– Galaxy
Mar 9 at 4:58
add a comment |
I've never tested this code, but should work.
The first thing is to check the number of lines
f =('myfile.txt')
def file_len(f):
with open(f) as f:
for i, l in enumerate(f):
pass
return i + 1
num_lines = file_len(f)
split the data in n partitions
n = threads (8 for example)
split_size = num_lines//n if num_lines//n > 0 else 1
parts = [x for x in range(0, num_lines, split_size)]
And now start the jobs:
from multiprocessing import Process
import linecache
jobs = []
for part in range(len(parts)):
p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
jobs.append(p)
p.start()
for p in jobs:
p.join()
An example of the function
def function_here(your_file_name, line_number, split_size):
for current_line in range(line_number, (line_number+split_size)+1):
print( linecache.getline(your_file_name, current_line))
Still, you will need to check the number of lines before doing any operation
I've never tested this code, but should work.
The first thing is to check the number of lines
f =('myfile.txt')
def file_len(f):
with open(f) as f:
for i, l in enumerate(f):
pass
return i + 1
num_lines = file_len(f)
split the data in n partitions
n = threads (8 for example)
split_size = num_lines//n if num_lines//n > 0 else 1
parts = [x for x in range(0, num_lines, split_size)]
And now start the jobs:
from multiprocessing import Process
import linecache
jobs = []
for part in range(len(parts)):
p = Process(target = function_here, args = ('myfile.txt', parts[part], split_size))
jobs.append(p)
p.start()
for p in jobs:
p.join()
An example of the function
def function_here(your_file_name, line_number, split_size):
for current_line in range(line_number, (line_number+split_size)+1):
print( linecache.getline(your_file_name, current_line))
Still, you will need to check the number of lines before doing any operation
edited Mar 9 at 2:07
answered Mar 9 at 1:53
kaihamikaihami
312411
312411
I am reading a gzip file and I prefer not to unzip it.
– Galaxy
Mar 9 at 3:40
I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?
– Galaxy
Mar 9 at 4:58
add a comment |
I am reading a gzip file and I prefer not to unzip it.
– Galaxy
Mar 9 at 3:40
I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?
– Galaxy
Mar 9 at 4:58
I am reading a gzip file and I prefer not to unzip it.
– Galaxy
Mar 9 at 3:40
I am reading a gzip file and I prefer not to unzip it.
– Galaxy
Mar 9 at 3:40
I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?
– Galaxy
Mar 9 at 4:58
I read ProcessPoolExecutor is a simplized abstract of multiprocessing. Is this simpler to use ProcessPoolExecutor?
– Galaxy
Mar 9 at 4:58
add a comment |
A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce
algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.
add a comment |
A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce
algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.
add a comment |
A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce
algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.
A 30 GB text file is big enough to put your question into the realm of Big-Data. So to tackle this problem I suggest using Big-Data tools like Hadoop and Spark. What you explained as a "producer-consumer flow" is basically what MapReduce
algorithm is designed for. The word count frequency is a typical MapReduce problem. Look it up, you will find tons of examples.
answered Mar 9 at 2:26
LoMaPhLoMaPh
3311714
3311714
add a comment |
add a comment |
The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
Finally merge the counters.
from itertools import islice
from multiprocessing import Pool
from collections import Counter
import os
NUM_OF_LINES = 3
INPUT_FILE = 'huge.txt'
POOL_SIZE = 10
def slice_huge_file():
cnt = 0
with open(INPUT_FILE) as f:
while True:
next_n_lines = list(islice(f, NUM_OF_LINES))
cnt += 1
if not next_n_lines:
break
with open('sub_huge_.txt'.format(cnt), 'w') as out:
out.writelines(next_n_lines)
def count_file_words(input_file):
with open(input_file, 'r') as f:
return Counter([w.strip() for w in f.readlines()])
if __name__ == '__main__':
slice_huge_file()
pool = Pool(POOL_SIZE)
sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
results = pool.map(count_file_words, sub_files)
final_counter = Counter()
for counter in results:
final_counter += counter
print(final_counter)
add a comment |
The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
Finally merge the counters.
from itertools import islice
from multiprocessing import Pool
from collections import Counter
import os
NUM_OF_LINES = 3
INPUT_FILE = 'huge.txt'
POOL_SIZE = 10
def slice_huge_file():
cnt = 0
with open(INPUT_FILE) as f:
while True:
next_n_lines = list(islice(f, NUM_OF_LINES))
cnt += 1
if not next_n_lines:
break
with open('sub_huge_.txt'.format(cnt), 'w') as out:
out.writelines(next_n_lines)
def count_file_words(input_file):
with open(input_file, 'r') as f:
return Counter([w.strip() for w in f.readlines()])
if __name__ == '__main__':
slice_huge_file()
pool = Pool(POOL_SIZE)
sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
results = pool.map(count_file_words, sub_files)
final_counter = Counter()
for counter in results:
final_counter += counter
print(final_counter)
add a comment |
The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
Finally merge the counters.
from itertools import islice
from multiprocessing import Pool
from collections import Counter
import os
NUM_OF_LINES = 3
INPUT_FILE = 'huge.txt'
POOL_SIZE = 10
def slice_huge_file():
cnt = 0
with open(INPUT_FILE) as f:
while True:
next_n_lines = list(islice(f, NUM_OF_LINES))
cnt += 1
if not next_n_lines:
break
with open('sub_huge_.txt'.format(cnt), 'w') as out:
out.writelines(next_n_lines)
def count_file_words(input_file):
with open(input_file, 'r') as f:
return Counter([w.strip() for w in f.readlines()])
if __name__ == '__main__':
slice_huge_file()
pool = Pool(POOL_SIZE)
sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
results = pool.map(count_file_words, sub_files)
final_counter = Counter()
for counter in results:
final_counter += counter
print(final_counter)
The idea is to break the huge file into smaller files. Invoke many workers that will do the count job and return a Counter.
Finally merge the counters.
from itertools import islice
from multiprocessing import Pool
from collections import Counter
import os
NUM_OF_LINES = 3
INPUT_FILE = 'huge.txt'
POOL_SIZE = 10
def slice_huge_file():
cnt = 0
with open(INPUT_FILE) as f:
while True:
next_n_lines = list(islice(f, NUM_OF_LINES))
cnt += 1
if not next_n_lines:
break
with open('sub_huge_.txt'.format(cnt), 'w') as out:
out.writelines(next_n_lines)
def count_file_words(input_file):
with open(input_file, 'r') as f:
return Counter([w.strip() for w in f.readlines()])
if __name__ == '__main__':
slice_huge_file()
pool = Pool(POOL_SIZE)
sub_files = [os.path.join('.',f) for f in os.listdir('.') if f.startswith('sub_huge')]
results = pool.map(count_file_words, sub_files)
final_counter = Counter()
for counter in results:
final_counter += counter
print(final_counter)
answered Mar 9 at 16:00
baldermanbalderman
1,84111419
1,84111419
add a comment |
add a comment |
just some pseudocode:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
import traceback
WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
QUEUE_SIZE = 100 # 10 times to your pool size is good enough
def main():
with Manager() as manager:
q = manager.Queue(QUEUE_SIZE)
# init worker pool
executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]
# start producer
run_producer(q)
# wait to done
for f in workers_pool:
try:
f.result()
except Exception:
traceback.print_exc()
def run_producer(q):
try:
with open("your file path") as fp:
for line in fp:
q.put(line)
except Exception:
traceback.print_exc()
finally:
q.put(None)
def worker(i, q):
while 1:
line = q.get()
if line is None:
print(f'worker i is done')
q.put(None)
return
# do something with this line
# ...
How to get the result after worker is done ? I got many<Future at 0x1078d3780 state=running>
but no results.
– Galaxy
Mar 14 at 3:23
add a comment |
just some pseudocode:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
import traceback
WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
QUEUE_SIZE = 100 # 10 times to your pool size is good enough
def main():
with Manager() as manager:
q = manager.Queue(QUEUE_SIZE)
# init worker pool
executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]
# start producer
run_producer(q)
# wait to done
for f in workers_pool:
try:
f.result()
except Exception:
traceback.print_exc()
def run_producer(q):
try:
with open("your file path") as fp:
for line in fp:
q.put(line)
except Exception:
traceback.print_exc()
finally:
q.put(None)
def worker(i, q):
while 1:
line = q.get()
if line is None:
print(f'worker i is done')
q.put(None)
return
# do something with this line
# ...
How to get the result after worker is done ? I got many<Future at 0x1078d3780 state=running>
but no results.
– Galaxy
Mar 14 at 3:23
add a comment |
just some pseudocode:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
import traceback
WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
QUEUE_SIZE = 100 # 10 times to your pool size is good enough
def main():
with Manager() as manager:
q = manager.Queue(QUEUE_SIZE)
# init worker pool
executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]
# start producer
run_producer(q)
# wait to done
for f in workers_pool:
try:
f.result()
except Exception:
traceback.print_exc()
def run_producer(q):
try:
with open("your file path") as fp:
for line in fp:
q.put(line)
except Exception:
traceback.print_exc()
finally:
q.put(None)
def worker(i, q):
while 1:
line = q.get()
if line is None:
print(f'worker i is done')
q.put(None)
return
# do something with this line
# ...
just some pseudocode:
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager
import traceback
WORKER_POOL_SIZE = 10 # you should set this as the number of your processes
QUEUE_SIZE = 100 # 10 times to your pool size is good enough
def main():
with Manager() as manager:
q = manager.Queue(QUEUE_SIZE)
# init worker pool
executor = ProcessPoolExecutor(max_workers=WORKER_POOL_SIZE)
workers_pool = [executor.submit(worker, i, q) for i in range(WORKER_POOL_SIZE)]
# start producer
run_producer(q)
# wait to done
for f in workers_pool:
try:
f.result()
except Exception:
traceback.print_exc()
def run_producer(q):
try:
with open("your file path") as fp:
for line in fp:
q.put(line)
except Exception:
traceback.print_exc()
finally:
q.put(None)
def worker(i, q):
while 1:
line = q.get()
if line is None:
print(f'worker i is done')
q.put(None)
return
# do something with this line
# ...
edited Mar 11 at 8:22
answered Mar 11 at 7:17
LaiskyLaisky
465
465
How to get the result after worker is done ? I got many<Future at 0x1078d3780 state=running>
but no results.
– Galaxy
Mar 14 at 3:23
add a comment |
How to get the result after worker is done ? I got many<Future at 0x1078d3780 state=running>
but no results.
– Galaxy
Mar 14 at 3:23
How to get the result after worker is done ? I got many
<Future at 0x1078d3780 state=running>
but no results.– Galaxy
Mar 14 at 3:23
How to get the result after worker is done ? I got many
<Future at 0x1078d3780 state=running>
but no results.– Galaxy
Mar 14 at 3:23
add a comment |
I have learned the multiprocessing lib on weekend.
The stop on Ctrl+C and write current result function is still not working.
The main function is fine now.
#!/usr/bin/env pypy3
import sys
from collections import Counter
from multiprocessing import Pool, Process, Manager, current_process, freeze_support
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
ChunkSize = 1024 * 128
verbose = 0
Nworkers = 16
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
#print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
#print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
#pass
#print('#MaxDepth='.format(MaxDepth),file=tsvout)
tsvout.close()
pass
def CallStat(inDepthFile):
import gzip
import itertools
RecordCnt = 0
MaxDepth = 0
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
#lines_queue = Queue()
manager = Manager()
lines_queue = manager.Queue()
stater_pool = Pool(Nworkers)
TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
#ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
#MapResult = stater_pool.map_async(iStator,TASKS,1)
AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
try:
with gzip.open(inDepthFile, 'rt') as tsvfin:
while True:
lines = tsvfin.readlines(ChunkSize)
lines_queue.put(lines)
if not lines:
for i in range(Nworkers):
lines_queue.put(b'nn')
break
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
for i in range(Nworkers):
lines_queue.put(b'nn')
pass
#for results in ApplyResult:
#(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
#for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
RecordCnt += iRecordCnt
if iMaxDepth > MaxDepth:
MaxDepth = iMaxDepth
for k in SamplesList:
cDepthCnt[k].update(icDepthCnt[k])
cDepthStat[k][0] += icDepthStat[k][0]
cDepthStat[k][1] += icDepthStat[k][1]
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
#def iStator(inQueue,inSamplesList):
def iStator(args):
(inQueue,inSamplesList) = args
import csv
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in inSamplesList
cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
for lines in iter(inQueue.get, b'nn'):
try:
tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
for row in tsvin:
#print(', '.join(row[col] for col in inSamplesList))
RecordCnt += 1
for k in inSamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
#DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
#yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
#print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
add a comment |
I have learned the multiprocessing lib on weekend.
The stop on Ctrl+C and write current result function is still not working.
The main function is fine now.
#!/usr/bin/env pypy3
import sys
from collections import Counter
from multiprocessing import Pool, Process, Manager, current_process, freeze_support
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
ChunkSize = 1024 * 128
verbose = 0
Nworkers = 16
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
#print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
#print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
#pass
#print('#MaxDepth='.format(MaxDepth),file=tsvout)
tsvout.close()
pass
def CallStat(inDepthFile):
import gzip
import itertools
RecordCnt = 0
MaxDepth = 0
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
#lines_queue = Queue()
manager = Manager()
lines_queue = manager.Queue()
stater_pool = Pool(Nworkers)
TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
#ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
#MapResult = stater_pool.map_async(iStator,TASKS,1)
AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
try:
with gzip.open(inDepthFile, 'rt') as tsvfin:
while True:
lines = tsvfin.readlines(ChunkSize)
lines_queue.put(lines)
if not lines:
for i in range(Nworkers):
lines_queue.put(b'nn')
break
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
for i in range(Nworkers):
lines_queue.put(b'nn')
pass
#for results in ApplyResult:
#(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
#for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
RecordCnt += iRecordCnt
if iMaxDepth > MaxDepth:
MaxDepth = iMaxDepth
for k in SamplesList:
cDepthCnt[k].update(icDepthCnt[k])
cDepthStat[k][0] += icDepthStat[k][0]
cDepthStat[k][1] += icDepthStat[k][1]
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
#def iStator(inQueue,inSamplesList):
def iStator(args):
(inQueue,inSamplesList) = args
import csv
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in inSamplesList
cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
for lines in iter(inQueue.get, b'nn'):
try:
tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
for row in tsvin:
#print(', '.join(row[col] for col in inSamplesList))
RecordCnt += 1
for k in inSamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
#DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
#yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
#print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
add a comment |
I have learned the multiprocessing lib on weekend.
The stop on Ctrl+C and write current result function is still not working.
The main function is fine now.
#!/usr/bin/env pypy3
import sys
from collections import Counter
from multiprocessing import Pool, Process, Manager, current_process, freeze_support
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
ChunkSize = 1024 * 128
verbose = 0
Nworkers = 16
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
#print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
#print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
#pass
#print('#MaxDepth='.format(MaxDepth),file=tsvout)
tsvout.close()
pass
def CallStat(inDepthFile):
import gzip
import itertools
RecordCnt = 0
MaxDepth = 0
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
#lines_queue = Queue()
manager = Manager()
lines_queue = manager.Queue()
stater_pool = Pool(Nworkers)
TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
#ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
#MapResult = stater_pool.map_async(iStator,TASKS,1)
AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
try:
with gzip.open(inDepthFile, 'rt') as tsvfin:
while True:
lines = tsvfin.readlines(ChunkSize)
lines_queue.put(lines)
if not lines:
for i in range(Nworkers):
lines_queue.put(b'nn')
break
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
for i in range(Nworkers):
lines_queue.put(b'nn')
pass
#for results in ApplyResult:
#(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
#for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
RecordCnt += iRecordCnt
if iMaxDepth > MaxDepth:
MaxDepth = iMaxDepth
for k in SamplesList:
cDepthCnt[k].update(icDepthCnt[k])
cDepthStat[k][0] += icDepthStat[k][0]
cDepthStat[k][1] += icDepthStat[k][1]
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
#def iStator(inQueue,inSamplesList):
def iStator(args):
(inQueue,inSamplesList) = args
import csv
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in inSamplesList
cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
for lines in iter(inQueue.get, b'nn'):
try:
tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
for row in tsvin:
#print(', '.join(row[col] for col in inSamplesList))
RecordCnt += 1
for k in inSamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
#DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
#yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
#print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
I have learned the multiprocessing lib on weekend.
The stop on Ctrl+C and write current result function is still not working.
The main function is fine now.
#!/usr/bin/env pypy3
import sys
from collections import Counter
from multiprocessing import Pool, Process, Manager, current_process, freeze_support
SamplesList = ('D_Crick', 'D_Watson', 'Normal_Crick', 'Normal_Watson', 'D_WGS', 'Normal_WGS')
ChunkSize = 1024 * 128
verbose = 0
Nworkers = 16
def main():
import math
if len(sys.argv) < 3 :
print('Usage:',sys.argv[0],'<samtools.depth.gz> <out.tsv> [verbose=0]',file=sys.stderr,flush=True)
exit(0)
try:
verbose = int(sys.argv[3])
except: # `except IndexError:` and `except ValueError:`
verbose = 0
inDepthFile = sys.argv[1]
outFile = sys.argv[2]
print('From:[], To:[].nVerbose: [].'.format(inDepthFile,outFile,verbose),file=sys.stderr,flush=True)
RecordCnt,MaxDepth,cDepthCnt,cDepthStat = CallStat(inDepthFile)
for k in SamplesList:
cDepthStat[k][2] = cDepthStat[k][0] / RecordCnt # E(X)
cDepthStat[k][3] = cDepthStat[k][1] / RecordCnt # E(X^2)
cDepthStat[k][4] = math.sqrt(cDepthStat[k][3] - cDepthStat[k][2]*cDepthStat[k][2]) # E(X^2)-E(X)^2
tsvout = open(outFile, 'wt')
print('#t'.format('Depth','t'.join(SamplesList)),file=tsvout)
#RecordCntLength = len(str(RecordCnt))
print( '#N=,SD:t'.format(RecordCnt,'t'.join(str(round(cDepthStat[col][4],1)) for col in SamplesList)),file=tsvout)
for depth in range(0,MaxDepth+1):
#print( 't'.format(depth,'t'.join(str(DepthCnt[col][depth]) for col in SamplesList)) )
#print( 't'.format(depth,'t'.join(str(yDepthCnt[depth][col]) for col in SamplesList)) )
print( 't'.format(depth,'t'.join(str(cDepthCnt[col][depth]) for col in SamplesList)),file=tsvout)
#pass
#print('#MaxDepth='.format(MaxDepth),file=tsvout)
tsvout.close()
pass
def CallStat(inDepthFile):
import gzip
import itertools
RecordCnt = 0
MaxDepth = 0
cDepthCnt = key:Counter() for key in SamplesList
cDepthStat = key:[0,0,0,0,0] for key in SamplesList # x and x^2
#lines_queue = Queue()
manager = Manager()
lines_queue = manager.Queue()
stater_pool = Pool(Nworkers)
TASKS = itertools.repeat((lines_queue,SamplesList),Nworkers)
#ApplyResult = [stater_pool.apply_async(iStator,x) for x in TASKS]
#MapResult = stater_pool.map_async(iStator,TASKS,1)
AsyncResult = stater_pool.imap_unordered(iStator,TASKS,1)
try:
with gzip.open(inDepthFile, 'rt') as tsvfin:
while True:
lines = tsvfin.readlines(ChunkSize)
lines_queue.put(lines)
if not lines:
for i in range(Nworkers):
lines_queue.put(b'nn')
break
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
for i in range(Nworkers):
lines_queue.put(b'nn')
pass
#for results in ApplyResult:
#(iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) = results.get()
#for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in MapResult.get():
for (iRecordCnt,iMaxDepth,icDepthCnt,icDepthStat) in AsyncResult:
RecordCnt += iRecordCnt
if iMaxDepth > MaxDepth:
MaxDepth = iMaxDepth
for k in SamplesList:
cDepthCnt[k].update(icDepthCnt[k])
cDepthStat[k][0] += icDepthStat[k][0]
cDepthStat[k][1] += icDepthStat[k][1]
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
#def iStator(inQueue,inSamplesList):
def iStator(args):
(inQueue,inSamplesList) = args
import csv
# Looking up things in global scope takes longer then looking up stuff in local scope. <https://stackoverflow.com/a/54645851/159695>
cDepthCnt = key:Counter() for key in inSamplesList
cDepthStat = key:[0,0] for key in inSamplesList # x and x^2
RecordCnt = 0
MaxDepth = 0
for lines in iter(inQueue.get, b'nn'):
try:
tsvin = csv.DictReader(lines, delimiter='t', fieldnames=('ChrID','Pos')+inSamplesList )
for row in tsvin:
#print(', '.join(row[col] for col in inSamplesList))
RecordCnt += 1
for k in inSamplesList:
theValue = int(row[k])
if theValue > MaxDepth:
MaxDepth = theValue
#DepthCnt[k][theValue] += 1 # PyPy3:30.54 ns, Python3:22.23 ns
#yDepthCnt[theValue][k] += 1 # PyPy3:30.47 ns, Python3:21.50 ns
cDepthCnt[k][theValue] += 1 # PyPy3:29.82 ns, Python3:30.61 ns
cDepthStat[k][0] += theValue
cDepthStat[k][1] += theValue * theValue
#print(MaxDepth,DepthCnt)
except KeyboardInterrupt:
print('n[!]Ctrl+C pressed.',file=sys.stderr,flush=True)
pass
#print('[!] Lines Read:[], MaxDepth is [].'.format(current_process().name,RecordCnt,MaxDepth),file=sys.stderr,flush=True)
return RecordCnt,MaxDepth,cDepthCnt,cDepthStat
if __name__ == "__main__":
main() # time python3 ./samdepthplot.py t.tsv.gz 1
answered Mar 11 at 9:00
GalaxyGalaxy
6251620
6251620
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f55073002%2fhow-to-count-word-frequencies-in-a-huge-file-concurrently%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
1
I know you lightly touched on how you're doing this but can you include the code you're using currenty?
– Jab
Mar 9 at 1:09
I wonder if you might not get better performance by just using a shell pipeline? See Command-line Tools can be 235x Faster than your Hadoop Cluster. This problem sounds like a great fit for
xargs
anduniq -c
, with maybe someawk
scripting to glue it all together.– Daniel Pryden
Mar 9 at 1:12
Have you looked into using
io.BufferedReader
? As explained in Reading & Writing GZIP Files Faster in Python– Jab
Mar 9 at 1:14
You could treat the gzipped file as a giant random-access list of lines without reading the whole thing into memory using something similar to what's being done in this answer only with a
mmap
instead of a temporary file (I have an un-posted version which does this). The memory-map could then be passed to multiple concurrent subprocesses along with a starting line number and line count. Each subprocess could count the words in the section assigned to it and pass back a dictionary when finished. These dictionaries could be all merged together.– martineau
Mar 9 at 2:09
I included my code now.
– Galaxy
Mar 9 at 3:34