1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
| import functools
import os
import sys
import time
import pandas as pd
from pymongo import MongoClient
def calc_rs(rs_no):
snp_info = {
"snp": rs_no,
"gg": {
},
"g": {
},
"total": 0,
"hits": 0
}
client = MongoClient()
db = client.test
collection = db.snp_info
snp_info["total"] = collection.count()
cursor = collection.find({rs_no: {"$ne": None}}, {rs_no: 1})
counts = cursor.count()
client.close()
if counts:
snp_info["hits"] = counts
else:
sys.exit("{} not found.".format(rs_no))
lst = list(map(lambda x: x[rs_no], cursor))
for i in lst:
gg = snp_info["gg"]
if gg.get(i):
gg[i] += 1
else:
gg[i] = 1
g_str = "".join(lst)
for i in set(g_str):
snp_info["g"][i] = g_str.count(i)
return snp_info
def calc_sp(sp):
client = MongoClient()
db = client.test
collection = db.snp_info
cursor = collection.find_one({"Sample_ID": sp}, {"_id": 0})
client.close()
if not cursor:
sys.exit("{} not found.".format(sp))
return {k: v for k, v in cursor.items() if v}
def usage():
sys.exit("Usage: python {} <snp_no | sample_id>".format(os.path.basename(sys.argv[0])))
def query_item(s):
st = s.lower()
print("-"*30)
if st.startswith("rs"):
snp = calc_rs(st)
print("RS No.\t{}".format(snp["snp"]))
print("TOTAL\t{}".format(snp["total"]))
hits = snp["hits"]
print("HITS\t{}".format(hits))
print("")
print("基因型\t计数\t频率")
gg = snp["gg"]
for i in gg:
print("{}\t{}\t{}%".format(i, gg[i], round(100 * gg[i] / hits, 2)))
print("")
print("基因\t计数\t频率")
g = snp["g"]
for i in g:
print("{}\t{}\t{}%".format(i, g[i], round(100 * 0.5 * g[i] / hits, 2)))
print("")
if "D" in g:
print("字母D表示单个碱基缺失")
print("")
elif st.startswith("bm"):
sn = st.upper()
sample = calc_sp(sn)
df = pd.DataFrame().from_dict({"": sample}, orient="index")
df = df.reindex_axis(sorted(df.columns), axis=1)
df.to_csv("{}.csv".format(sn), index=None)
print("信息已写入: {}.csv".format(sn))
else:
usage()
def time_it(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t1 = time.time()
rt = func(*args, **kwargs)
print("计算用时: {}s".format(round(time.time() - t1, 2)))
return rt
return wrapper
@time_it
def main():
list(map(query_item, set(sys.argv[1:])))
if __name__ == "__main__":
if len(sys.argv) < 2:
usage()
main()
|