from sam2lca.parse_alignment import Alignment
from sam2lca.acc2tax import get_mapping
from sam2lca.lca import compute_lca
from sam2lca import utils
from sam2lca.taxonomy import setup_taxopy_db, load_taxonomy_db
from sam2lca.write_alignment import write_bam_tags, write_bam_by_taxid
from pathlib import Path
import logging
from multiprocessing import cpu_count
from tqdm.contrib.concurrent import thread_map
from functools import partial
import sys
import os
from pprint import pprint
[docs]def sam2lca(
sam,
output=None,
dbdir=f"{str(Path.home())}/.sam2lca",
taxonomy="ncbi",
acc2tax="nucl",
process=2,
identity=0.8,
distance=None,
length=30,
conserved=False,
bam_out=False,
bam_split_rank=False,
bam_split_read=50,
):
"""Performs LCA on SAM/BAM/CRAM alignment file
Args:
sam (str): Path to SAM/BAM/CRAM alignment file
output(str): Path to sam2lca output file
dbdir (str): Path to database storing directory
taxonomy(str): Type of Taxonomy database
acc2tax(str): Type of acc2tax database
process (int): Number of process for parallelization
identity(float): Minimum alignment identity threshold
edit_distance(int): Maximum edit distance threshold
length(int): Minimum alignment length
bam_out(bool): Write BAM output file with XT tag for TAXID
bam_split_rank(str): Rank to split BAM output file by TAXID
bam_split_read(int): Minimum number of reads to split BAM output file by TAXID
"""
nb_steps = 8 if conserved else 7
process = cpu_count() if process == 0 else process
Path(dbdir).mkdir(exist_ok=True)
avail_taxo, avail_acc2tax = list_available_db(dbdir)
if taxonomy not in avail_taxo:
logging.info(
f"{taxonomy} taxonomy database is not installed. I'm trying to create it for you"
)
update_database(taxonomy=taxonomy, dbdir=dbdir, acc2tax=None)
if acc2tax not in avail_acc2tax:
logging.info(
f"{acc2tax} acc2tax database is ls not installed. I'm trying to create it for you"
)
update_database(acc2tax=acc2tax, dbdir=dbdir)
logging.info(f"Step 1/{nb_steps}: Loading taxonomy database")
TAXDB = load_taxonomy_db(dbdir, taxonomy)
acc2tax_db = f"{dbdir}/{acc2tax}.db"
p = Path(acc2tax_db)
if not p.exists():
logging.error(
f"\t'{acc2tax_db}' database seems to be missing, please check your inputs"
)
sys.exit(1)
if not output:
output = utils.output_file(sam)
else:
output = utils.output_file(output)
utils.check_extension(sam)
al = Alignment(al_file=sam, nb_steps=nb_steps)
al.get_refs_taxid(acc2tax_db, TAXDB)
read_dict = al.get_reads(
process=process,
identity=identity,
edit_distance=distance,
minlength=length,
check_conserved=conserved,
)
reads_taxid_dict = compute_lca(read_dict, process, nb_steps, taxo_db=TAXDB)
taxid_counts = utils.count_reads_taxid(reads_taxid_dict)
taxid_info_dict = utils.taxid_to_lineage(
taxid_counts,
output["sam2lca"],
process=process,
nb_steps=nb_steps,
taxo_db=TAXDB,
)
if bam_out and bam_split_rank and taxid_info_dict:
logging.info(f"* BAM files split by TAXID at the {bam_split_rank} level")
taxids_list = utils.filter_taxid_info_dict(
taxid_info_dict,
rank=bam_split_rank,
minread=bam_split_read
).keys()
write_bam_by_taxid(
target_taxids=taxids_list,
infile=sam,
outfile_base=output["bam"],
total_reads = al.total_reads,
read_taxid_dict=reads_taxid_dict,
acc2tax_dict=al.acc2tax,
taxid_info_dict=taxid_info_dict,
identity=identity,
edit_distance=distance,
minlength=length,
process=process
)
if bam_out and not bam_split_rank and taxid_info_dict:
write_bam_tags(
infile=sam,
outfile=output["bam"],
total_reads=al.total_reads,
read_taxid_dict=reads_taxid_dict,
taxid_info_dict=taxid_info_dict,
identity=identity,
edit_distance=distance,
minlength=length,
process=process
)
return taxid_info_dict
[docs]def update_database(
dbdir=f"{str(Path.home())}/.sam2lca",
taxonomy=None,
taxo_names=None,
taxo_nodes=None,
taxo_merged=None,
acc2tax="nucl",
acc2tax_json="https://raw.githubusercontent.com/maxibor/sam2lca/master/data/acc2tax.json",
):
"""Performs LCA on SAM/BAM/CRAM alignment file
Args:
dbdir (str): Path to database storing directory
taxonomy(str): Name of Taxonomy database
names(str): names.dmp file for taxonomy database. None loads the NCBI taxonomy database
nodes(str): nodes.dmp file for taxonomy database. None loads the NCBI taxonomy database
merged(str): merged.dmp file for taxonomy database. None loads the NCBI taxonomy database
acc2tax(str): Type of acc2tax database
acc2tax_json(str): Path to acc2tax json file
"""
map_config = utils.get_json(json_path=acc2tax_json)
if acc2tax is not None:
logging.info(f"* Downloading/updating acc2tax {acc2tax} database")
get_mapping(map_config=map_config, maptype=acc2tax, dbdir=dbdir)
if taxonomy:
logging.info(f"* Setting up {taxonomy} taxonomy database")
setup_taxopy_db(
dbdir=dbdir,
db_type=taxonomy,
nodes=taxo_nodes,
names=taxo_names,
merged=taxo_merged,
)
[docs]def list_available_db(dbdir, verbose=False):
"""List available taxonomy databases
Args:
db_dir(str): Path to sam2lca database directory
Returns:
list: List of available taxonomy databases
list: List of available acc2tax databases
"""
taxo_db = []
acc2tax_db = []
if os.path.exists(dbdir):
for file in os.listdir(dbdir):
if file.endswith(".pkl"):
taxo_db.append(file.split(".")[0])
continue
if file.endswith(".db"):
acc2tax_db.append(file.split(".")[0])
continue
if verbose:
logging.info(f"* Available taxonomy databases: {', '.join(taxo_db)}")
logging.info(f"* Available acc2tax databases: {', '.join(acc2tax_db)}")
return taxo_db, acc2tax_db
else:
logging.error(f"* {dbdir} does not exist, please create sam2lca database first")
sys.exit(1)
if __name__ == "__main__":
sam2lca(
sam=f"{Path(__file__).parent.resolve()}/../tests/data/aligned.sorted.bam",
conserved=False,
tree=f"{Path(__file__).parent.resolve()}/../tests/data/taxonomy/test.tree",
mappings="test",
process=0,
bam_out=True,
)