Skip to content

Commit

Permalink
Introduce PySam as an alternative parsing library. (#491)
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov authored Dec 9, 2019
1 parent 8049c19 commit ba98509
Show file tree
Hide file tree
Showing 19 changed files with 1,491 additions and 153 deletions.
14 changes: 14 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ ADD / /opt/gcp_variant_transforms/src/
# Needed for installing mmh3 (one of the required packages in setup.py).
RUN apt install -y g++

# Install Pysam dependencies. These dependencies are only required becuase we
# have a monolithic binary - they primarily have to be installed on the workers.
RUN apt-get update && apt-get install -y \
autoconf \
automake \
gcc \
libbz2-dev \
libcurl4-openssl-dev \
liblzma-dev \
libssl-dev \
make \
perl \
zlib1g-dev

# Install dependencies.
RUN python -m pip install --upgrade pip && \
python -m pip install --upgrade virtualenv && \
Expand Down
200 changes: 170 additions & 30 deletions gcp_variant_transforms/beam_io/vcf_header_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

from __future__ import absolute_import

from collections import OrderedDict
import collections
from functools import partial
from typing import Dict, Iterable, List # pylint: disable=unused-import
from pysam import libcbcf
import vcf


import apache_beam as beam
from apache_beam.io import filebasedsource
from apache_beam.io import range_trackers # pylint: disable=unused-import
Expand All @@ -32,6 +34,8 @@
from gcp_variant_transforms.beam_io import bgzf
from gcp_variant_transforms.beam_io import vcfio

LAST_HEADER_LINE_PREFIX = '#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO'


class VcfHeaderFieldTypeConstants(object):
"""Constants for types from VCF header."""
Expand All @@ -52,18 +56,34 @@ class VcfParserHeaderKeyConstants(object):
VERSION = 'version'
LENGTH = 'length'

VcfHeaderInfoField = collections.namedtuple(
'Info', ['id', 'num', 'type', 'desc', 'source', 'version'])

VcfHeaderFormatField = collections.namedtuple(
'Format', ['id', 'num', 'type', 'desc'])

VCF_HEADER_INFO_NUM_FIELD_CONVERSION = {
None: '.',
-1: 'A', # PyVCF value when number of alternate alleles dictates the count
-2: 'G', # PyVCF value when number of genotypes dictates the count
-3: 'R', # PyVCF value when number of alleles and ref dictates the count
'A': 'A', # PySam value when number of alternate alleles dictates the count
'G': 'G', # PySam value when number of genotypes dictates the count
'R': 'R', # PySam value when number of alleles and ref dictates the count
}

class VcfHeader(object):
"""Container for header data."""

def __init__(self,
infos=None, # type: OrderedDict[str, vcf.parser._Info]
filters=None, # type: OrderedDict[str, vcf.parser._Filter]
alts=None, # type: OrderedDict[str, vcf.parser._Alt]
formats=None, # type: OrderedDict[str, vcf.parser._Format]
contigs=None, # type: OrderedDict[str, vcf.parser._Contig]
samples=None, # type: List[str]
file_path=None # type: str
infos=None, # type: Any
filters=None, # type: Any
alts=None, # type: Any
formats=None, # type: Any
contigs=None, # type: Any
samples=None, # type: Any
file_path=None, # type: str
vcf_parser=vcfio.VcfParserType.PYVCF # type: vcfio.VcfParserType
):
# type: (...) -> None
"""Initializes a VcfHeader object.
Expand All @@ -81,13 +101,21 @@ def __init__(self,
samples: A list of sample names.
file_path: The full file path of the vcf file.
"""
# type: OrderedDict[str, OrderedDict]
self.infos = self._values_asdict(infos or {})
self.filters = self._values_asdict(filters or {})
self.alts = self._values_asdict(alts or {})
self.formats = self._values_asdict(formats or {})
self.contigs = self._values_asdict(contigs or {})
self.samples = samples
# type: collections.OrderedDict[str, collections.OrderedDict]
if vcf_parser == vcfio.VcfParserType.PYSAM:
self.infos = self._get_infos_pysam(infos)
self.filters = self._get_filters_pysam(filters)
self.alts = self._get_alts_pysam(alts)
self.formats = self._get_formats_pysam(formats)
self.contigs = self._get_contigs_pysam(contigs)
self.samples = self._get_samples_pysam(samples)
else:
self.infos = self._values_asdict(infos or {})
self.filters = self._values_asdict(filters or {})
self.alts = self._values_asdict(alts or {})
self.formats = self._values_asdict(formats or {})
self.contigs = self._values_asdict(contigs or {})
self.samples = samples
self.file_path = file_path

def __eq__(self, other):
Expand All @@ -106,14 +134,75 @@ def __repr__(self):

def _values_asdict(self, header):
"""Converts PyVCF header values to ordered dictionaries."""
ordered_dict = OrderedDict()
ordered_dict = collections.OrderedDict()
for key in header:
# These methods were not designed to be protected. They start with an
# underscore to avoid conflicts with field names. For more info, see
# https://docs.python.org/2/library/collections.html#collections.namedtuple
ordered_dict[key] = header[key]._asdict() # pylint: disable=W0212
return ordered_dict

def _get_infos_pysam(self, infos):
results = collections.OrderedDict()
for item in infos.items():
result = collections.OrderedDict()
result['id'] = item[0]
result['num'] = item[1].number if item[1].number != '.' else None
result['type'] = item[1].type
result['desc'] = item[1].description
# Pysam doesn't return these fields in info
result['source'] = None
result['version'] = None
results[item[0]] = result
return dict(results.items())

def _get_filters_pysam(self, filters):
results = collections.OrderedDict()
for item in filters.items():
result = collections.OrderedDict()
result['id'] = item[0]
result['desc'] = item[1].description
results[item[0]] = result
# PySAM adds default PASS value to its filters
del results['PASS']
return dict(results.items())

def _get_alts_pysam(self, alts):
results = collections.OrderedDict()
for item in alts.items():
result = collections.OrderedDict()
result['id'] = item[0]
result['desc'] = item[1]['Description'].strip("\"")
results[item[0]] = result
return dict(results.items())

def _get_formats_pysam(self, formats):
results = collections.OrderedDict()
for item in formats.items():
result = collections.OrderedDict()
result['id'] = item[0]
result['num'] = item[1].number if item[1].number != '.' else None
result['type'] = item[1].type
result['desc'] = item[1].description
results[item[0]] = result
return dict(results.items())

def _get_contigs_pysam(self, contigs):
results = collections.OrderedDict()
for item in contigs.items():
result = collections.OrderedDict()
result['id'] = item[0]
result['length'] = item[1].length
results[item[0]] = result
return dict(results.items())

def _get_samples_pysam(self, sample_line):
sample_tags = sample_line.split('\t')
if len(sample_tags) > 9:
return sample_tags[9:]
else:
return []


class VcfHeaderSource(filebasedsource.FileBasedSource):
"""A source for reading VCF file headers.
Expand All @@ -124,15 +213,28 @@ class VcfHeaderSource(filebasedsource.FileBasedSource):
def __init__(self,
file_pattern,
compression_type=CompressionTypes.AUTO,
validate=True):
# type: (str, str, bool) -> None
validate=True,
vcf_parser=vcfio.VcfParserType.PYVCF):
# type: (str, str, bool, vcfio.VcfParserType) -> None
super(VcfHeaderSource, self).__init__(file_pattern,
compression_type=compression_type,
validate=validate,
splittable=False)
self._compression_type = compression_type
self._vcf_parser = vcf_parser

def read_records(
self,
file_path, # type: str
unused_range_tracker, # type: range_trackers.UnsplittableRangeTracker
):
# type: (...) -> Iterable[VcfHeader]
if self._vcf_parser == vcfio.VcfParserType.PYSAM:
return self._read_records_pysam(file_path, unused_range_tracker)
else:
return self._read_records_pyvcf(file_path, unused_range_tracker)

def _read_records_pyvcf(
self,
file_path, # type: str
unused_range_tracker # type: range_trackers.UnsplittableRangeTracker
Expand All @@ -142,26 +244,54 @@ def read_records(
vcf_reader = vcf.Reader(fsock=self._read_headers(file_path))
except StopIteration:
raise ValueError('{} has no header.'.format(file_path))

yield VcfHeader(infos=vcf_reader.infos,
filters=vcf_reader.filters,
alts=vcf_reader.alts,
formats=vcf_reader.formats,
contigs=vcf_reader.contigs,
samples=vcf_reader.samples,
file_path=file_path)
file_path=file_path,
vcf_parser=vcfio.VcfParserType.PYVCF)

def _read_headers(self, file_path):
with self.open_file(file_path) as file_to_read:
while True:
record = file_to_read.readline()
while not record or not record.strip(): # Skip empty lines.
while record and not record.strip(): # Skip empty lines.
record = file_to_read.readline()
if record and record.startswith('#'):
yield record
yield record.strip()
else:
break

def _read_records_pysam(
self,
file_path, # type: str
unused_range_tracker # type: range_trackers.UnsplittableRangeTracker
):
# type: (...) -> Iterable[VcfHeader]
header = libcbcf.VariantHeader()
lines = self._read_headers(file_path)
sample_line = LAST_HEADER_LINE_PREFIX
header.add_line('##fileformat=VCFv4.0')
for line in lines:
if line.startswith('#'):
if line.startswith(LAST_HEADER_LINE_PREFIX):
sample_line = line.strip()
break
else:
header.add_line(line.strip())
else:
break
yield VcfHeader(infos=header.info,
filters=header.filters,
alts=header.alts,
formats=header.formats,
contigs=header.contigs,
samples=sample_line,
file_path=file_path,
vcf_parser=vcfio.VcfParserType.PYSAM)

def open_file(self, file_path):
if self._compression_type == CompressionTypes.GZIP:
return bgzf.open_bgzf(file_path)
Expand All @@ -181,6 +311,7 @@ def __init__(
file_pattern, # type: str
compression_type=CompressionTypes.AUTO, # type: str
validate=True, # type: bool
vcf_parser=vcfio.VcfParserType.PYVCF, # type: vcfio.VcfParserType
**kwargs # type: **str
):
# type: (...) -> None
Expand All @@ -198,15 +329,22 @@ def __init__(
"""
super(ReadVcfHeaders, self).__init__(**kwargs)
self._source = VcfHeaderSource(
file_pattern, compression_type, validate=validate)
file_pattern,
compression_type,
validate=validate,
vcf_parser=vcf_parser)

def expand(self, pvalue):
return pvalue.pipeline | Read(self._source)


def _create_vcf_header_source(file_pattern=None, compression_type=None):
def _create_vcf_header_source(
file_pattern=None,
compression_type=None,
vcf_parser=vcfio.VcfParserType.PYVCF):
return VcfHeaderSource(file_pattern=file_pattern,
compression_type=compression_type)
compression_type=compression_type,
vcf_parser=vcf_parser)


class ReadAllVcfHeaders(PTransform):
Expand All @@ -226,8 +364,9 @@ def __init__(
self,
desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE,
compression_type=CompressionTypes.AUTO,
vcf_parser=vcfio.VcfParserType.PYVCF,
**kwargs):
# type: (int, str, **str) -> None
# type: (int, str, **str, vcfio.VcfParserType) -> None
"""Initialize the :class:`ReadAllVcfHeaders` transform.
Args:
Expand All @@ -242,7 +381,9 @@ def __init__(
"""
super(ReadAllVcfHeaders, self).__init__(**kwargs)
source_from_file = partial(
_create_vcf_header_source, compression_type=compression_type)
_create_vcf_header_source,
compression_type=compression_type,
vcf_parser=vcf_parser)
self._read_all_files = filebasedsource.ReadAllFiles(
False, # splittable (we are just reading the headers)
CompressionTypes.AUTO, desired_bundle_size,
Expand Down Expand Up @@ -406,9 +547,8 @@ def _format_number(self, number):
return None
elif number >= 0:
return str(number)
number_to_string = {v: k for k, v in vcf.parser.field_counts.items()}
if number in number_to_string:
return number_to_string[number]
if number in VCF_HEADER_INFO_NUM_FIELD_CONVERSION:
return VCF_HEADER_INFO_NUM_FIELD_CONVERSION[number]
else:
raise ValueError('Invalid value for number: {}'.format(number))

Expand Down
Loading

0 comments on commit ba98509

Please sign in to comment.