Skip to content

File Readers

Input file parsers for various sequence formats.

Overview

PyPopART supports multiple sequence file formats:

  • FASTA: Most common format
  • NEXUS: With metadata support
  • PHYLIP: Sequential and interleaved
  • GenBank: Full GenBank entries

Modules

pypopart.io.fasta

FASTA file format reader and writer for PyPopART.

FastaReader

Reader for FASTA format sequence files.

Supports plain text, gzip, and zip compressed files.

Source code in src/pypopart/io/fasta.py
class FastaReader:
    """
    Reader for FASTA format sequence files.

    Supports plain text, gzip, and zip compressed files.
    """

    def __init__(self, filepath: Union[str, Path], validate: bool = True):
        """
        Initialize FASTA reader.

        Parameters
        ----------
        filepath :
            Path to FASTA file.
        validate :
            Whether to validate sequences.
        """
        self.filepath = Path(filepath)
        self.validate = validate

        if not self.filepath.exists():
            raise FileNotFoundError(f'File not found: {filepath}')

    def _open_file(self) -> TextIO:
        """
        Open file handling compression automatically.

        Returns
        -------
            File handle.
        """
        if self.filepath.suffix == '.gz':
            return gzip.open(self.filepath, 'rt')
        elif self.filepath.suffix == '.zip':
            with zipfile.ZipFile(self.filepath) as zf:
                # Assume first file in zip is the FASTA
                names = zf.namelist()
                if not names:
                    raise ValueError('Empty zip file')
                return zf.open(names[0], 'r')
        else:
            return open(self.filepath, 'r')

    def read_sequences(self, progress_callback=None) -> Iterator[Sequence]:
        """
        Read sequences from FASTA file.

        Parameters
        ----------
        progress_callback :
            Optional callback function(current, total).

        Yields :
            Sequence objects.
        """
        count = 0
        with self._open_file() as handle:
            for record in SeqIO.parse(handle, 'fasta'):
                # Extract metadata from description
                metadata = {}
                if '|' in record.description:
                    # Parse pipe-separated metadata
                    parts = record.description.split('|')
                    if len(parts) > 1:
                        for part in parts[1:]:
                            if '=' in part:
                                key, value = part.split('=', 1)
                                metadata[key.strip()] = value.strip()

                seq = Sequence(
                    id=record.id,
                    data=str(record.seq).upper(),
                    description=record.description,
                    metadata=metadata,
                )

                # Validation happens automatically in Sequence.__init__

                count += 1
                if progress_callback:
                    progress_callback(count, None)

                yield seq

    def read_alignment(self, progress_callback=None) -> Alignment:
        """
            Read alignment from FASTA file.

        Parameters
        ----------
            progress_callback :
                Optional callback function(current, total).

        Returns
        -------
            Alignment object.
        """
        sequences = list(self.read_sequences(progress_callback))
        alignment = Alignment(sequences)

        if self.validate:
            alignment.validate()

        return alignment
__init__
__init__(filepath: Union[str, Path], validate: bool = True)

Initialize FASTA reader.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Path to FASTA file.

required
validate bool

Whether to validate sequences.

True
Source code in src/pypopart/io/fasta.py
def __init__(self, filepath: Union[str, Path], validate: bool = True):
    """
    Initialize FASTA reader.

    Parameters
    ----------
    filepath :
        Path to FASTA file.
    validate :
        Whether to validate sequences.
    """
    self.filepath = Path(filepath)
    self.validate = validate

    if not self.filepath.exists():
        raise FileNotFoundError(f'File not found: {filepath}')
read_sequences
read_sequences(
    progress_callback=None,
) -> Iterator[Sequence]

Read sequences from FASTA file.

Parameters:

Name Type Description Default
progress_callback

Optional callback function(current, total).

None
Yields

Sequence objects.

required
Source code in src/pypopart/io/fasta.py
def read_sequences(self, progress_callback=None) -> Iterator[Sequence]:
    """
    Read sequences from FASTA file.

    Parameters
    ----------
    progress_callback :
        Optional callback function(current, total).

    Yields :
        Sequence objects.
    """
    count = 0
    with self._open_file() as handle:
        for record in SeqIO.parse(handle, 'fasta'):
            # Extract metadata from description
            metadata = {}
            if '|' in record.description:
                # Parse pipe-separated metadata
                parts = record.description.split('|')
                if len(parts) > 1:
                    for part in parts[1:]:
                        if '=' in part:
                            key, value = part.split('=', 1)
                            metadata[key.strip()] = value.strip()

            seq = Sequence(
                id=record.id,
                data=str(record.seq).upper(),
                description=record.description,
                metadata=metadata,
            )

            # Validation happens automatically in Sequence.__init__

            count += 1
            if progress_callback:
                progress_callback(count, None)

            yield seq
read_alignment
read_alignment(progress_callback=None) -> Alignment
Read alignment from FASTA file.

Returns:

Type Description
Alignment object.
Source code in src/pypopart/io/fasta.py
def read_alignment(self, progress_callback=None) -> Alignment:
    """
        Read alignment from FASTA file.

    Parameters
    ----------
        progress_callback :
            Optional callback function(current, total).

    Returns
    -------
        Alignment object.
    """
    sequences = list(self.read_sequences(progress_callback))
    alignment = Alignment(sequences)

    if self.validate:
        alignment.validate()

    return alignment

FastaWriter

Writer for FASTA format sequence files.

Source code in src/pypopart/io/fasta.py
class FastaWriter:
    """Writer for FASTA format sequence files."""

    def __init__(
        self,
        filepath: Union[str, Path],
        line_length: int = 80,
        compress: Optional[str] = None,
    ):
        """
        Initialize FASTA writer.

        Parameters
        ----------
        filepath :
            Output file path.
        line_length :
            Maximum line length for sequences (0 for no wrapping).
        compress :
            Compression format ('gzip' or None).
        """
        self.filepath = Path(filepath)
        self.line_length = line_length
        self.compress = compress

        # Add compression extension if needed
        if compress == 'gzip' and not str(self.filepath).endswith('.gz'):
            self.filepath = Path(str(self.filepath) + '.gz')

    def _open_file(self) -> TextIO:
        """
        Open file for writing with optional compression.

        Returns
        -------
            File handle.
        """
        if self.compress == 'gzip':
            return gzip.open(self.filepath, 'wt')
        else:
            return open(self.filepath, 'w')

    def write_sequences(
        self, sequences: Iterator[Sequence], progress_callback=None
    ) -> int:
        """
            Write sequences to FASTA file.

        Parameters
        ----------
            sequences :
                Iterable of Sequence objects.
            progress_callback :
                Optional callback function(current, total).

        Returns
        -------
            Number of sequences written.
        """
        count = 0

        with self._open_file() as handle:
            for seq in sequences:
                # Build header
                header = f'>{seq.id}'
                if seq.description and seq.description != seq.id:
                    header = f'>{seq.description}'

                # Add metadata to header
                if seq.metadata:
                    metadata_str = '|'.join(f'{k}={v}' for k, v in seq.metadata.items())
                    header += f'|{metadata_str}'

                handle.write(header + '\n')

                # Write sequence data with line wrapping
                if self.line_length > 0:
                    for i in range(0, len(seq.data), self.line_length):
                        handle.write(seq.data[i : i + self.line_length] + '\n')
                else:
                    handle.write(seq.data + '\n')

                count += 1
                if progress_callback:
                    progress_callback(count, None)

        return count

    def write_alignment(self, alignment: Alignment, progress_callback=None) -> int:
        """
            Write alignment to FASTA file.

        Parameters
        ----------
            alignment :
                Alignment object.
            progress_callback :
                Optional callback function(current, total).

        Returns
        -------
            Number of sequences written.
        """
        return self.write_sequences(iter(alignment), progress_callback)
__init__
__init__(
    filepath: Union[str, Path],
    line_length: int = 80,
    compress: Optional[str] = None,
)

Initialize FASTA writer.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Output file path.

required
line_length int

Maximum line length for sequences (0 for no wrapping).

80
compress Optional[str]

Compression format ('gzip' or None).

None
Source code in src/pypopart/io/fasta.py
def __init__(
    self,
    filepath: Union[str, Path],
    line_length: int = 80,
    compress: Optional[str] = None,
):
    """
    Initialize FASTA writer.

    Parameters
    ----------
    filepath :
        Output file path.
    line_length :
        Maximum line length for sequences (0 for no wrapping).
    compress :
        Compression format ('gzip' or None).
    """
    self.filepath = Path(filepath)
    self.line_length = line_length
    self.compress = compress

    # Add compression extension if needed
    if compress == 'gzip' and not str(self.filepath).endswith('.gz'):
        self.filepath = Path(str(self.filepath) + '.gz')
write_sequences
write_sequences(
    sequences: Iterator[Sequence], progress_callback=None
) -> int
Write sequences to FASTA file.

Returns:

Type Description
Number of sequences written.
Source code in src/pypopart/io/fasta.py
def write_sequences(
    self, sequences: Iterator[Sequence], progress_callback=None
) -> int:
    """
        Write sequences to FASTA file.

    Parameters
    ----------
        sequences :
            Iterable of Sequence objects.
        progress_callback :
            Optional callback function(current, total).

    Returns
    -------
        Number of sequences written.
    """
    count = 0

    with self._open_file() as handle:
        for seq in sequences:
            # Build header
            header = f'>{seq.id}'
            if seq.description and seq.description != seq.id:
                header = f'>{seq.description}'

            # Add metadata to header
            if seq.metadata:
                metadata_str = '|'.join(f'{k}={v}' for k, v in seq.metadata.items())
                header += f'|{metadata_str}'

            handle.write(header + '\n')

            # Write sequence data with line wrapping
            if self.line_length > 0:
                for i in range(0, len(seq.data), self.line_length):
                    handle.write(seq.data[i : i + self.line_length] + '\n')
            else:
                handle.write(seq.data + '\n')

            count += 1
            if progress_callback:
                progress_callback(count, None)

    return count
write_alignment
write_alignment(
    alignment: Alignment, progress_callback=None
) -> int
Write alignment to FASTA file.

Returns:

Type Description
Number of sequences written.
Source code in src/pypopart/io/fasta.py
def write_alignment(self, alignment: Alignment, progress_callback=None) -> int:
    """
        Write alignment to FASTA file.

    Parameters
    ----------
        alignment :
            Alignment object.
        progress_callback :
            Optional callback function(current, total).

    Returns
    -------
        Number of sequences written.
    """
    return self.write_sequences(iter(alignment), progress_callback)

pypopart.io.nexus

NEXUS file format reader and writer for PyPopART.

Supports PopART-style NEXUS files with traits blocks.

NexusReader

Reader for NEXUS format files.

Supports PopART-style NEXUS with traits blocks.

Source code in src/pypopart/io/nexus.py
class NexusReader:
    """
    Reader for NEXUS format files.

    Supports PopART-style NEXUS with traits blocks.
    """

    def __init__(self, filepath: Union[str, Path], validate: bool = True):
        """
        Initialize NEXUS reader.

        Parameters
        ----------
        filepath :
            Path to NEXUS file.
        validate :
            Whether to validate sequences and alignment.
        """
        self.filepath = Path(filepath)
        self.validate = validate

        if not self.filepath.exists():
            raise FileNotFoundError(f'File not found: {filepath}')

        self.traits = {}

    def _open_file(self) -> TextIO:
        """Open file handling gzip compression."""
        if self.filepath.suffix == '.gz':
            return gzip.open(self.filepath, 'rt')
        else:
            return open(self.filepath, 'r')

    def _parse_dimensions(self, content: str) -> Tuple[int, int]:
        """
            Parse DIMENSIONS block.

        Parameters
        ----------
            content :
                NEXUS file content.

        Returns
        -------
            Tuple of (ntax, nchar).
        """
        dimensions_match = re.search(
            r'DIMENSIONS\s+NTAX=(\d+)\s+NCHAR=(\d+)', content, re.IGNORECASE
        )

        if dimensions_match:
            ntax = int(dimensions_match.group(1))
            nchar = int(dimensions_match.group(2))
            return ntax, nchar

        return 0, 0

    def _parse_matrix(self, content: str) -> Dict[str, str]:
        """
            Parse MATRIX block.

        Parameters
        ----------
            content :
                NEXUS file content.

        Returns
        -------
            Dictionary mapping sequence IDs to sequence data.
        """
        sequences = {}

        # Find MATRIX block
        matrix_match = re.search(
            r'MATRIX\s+(.*?);\s*END;', content, re.IGNORECASE | re.DOTALL
        )

        if not matrix_match:
            raise ValueError('No MATRIX block found in NEXUS file')

        matrix_content = matrix_match.group(1)

        # Parse sequences (handle both interleaved and sequential formats)
        current_id = None

        for line in matrix_content.split('\n'):
            line = line.strip()
            if not line:
                continue

            # Check if line starts with sequence ID
            parts = line.split(None, 1)
            if len(parts) == 2:
                seq_id, seq_data = parts
                seq_data = seq_data.replace(' ', '').replace('\t', '')

                if seq_id in sequences:
                    # Interleaved format - append to existing sequence
                    sequences[seq_id] += seq_data
                else:
                    # New sequence
                    sequences[seq_id] = seq_data
            elif len(parts) == 1:
                # Continuation of previous sequence
                seq_data = parts[0].replace(' ', '').replace('\t', '')
                if current_id:
                    sequences[current_id] += seq_data

        return sequences

    def _parse_traits(self, content: str) -> Dict[str, Dict[str, str]]:
        """
            Parse TRAITS block (PopART extension).

        Parameters
        ----------
            content :
                NEXUS file content.

        Returns
        -------
            Dictionary mapping sequence IDs to trait dictionaries.
        """
        traits = {}

        # Find TRAITS block
        traits_match = re.search(
            r'BEGIN TRAITS;(.*?)END;', content, re.IGNORECASE | re.DOTALL
        )

        if not traits_match:
            return traits

        traits_content = traits_match.group(1)

        # Parse TRAITLABELS
        labels_match = re.search(r'TRAITLABELS\s+(.*?);', traits_content, re.IGNORECASE)

        trait_labels = []
        if labels_match:
            trait_labels = labels_match.group(1).split()

        # Parse MATRIX
        matrix_match = re.search(
            r'MATRIX\s+(.*?);', traits_content, re.IGNORECASE | re.DOTALL
        )

        if matrix_match:
            matrix_content = matrix_match.group(1)
            for line in matrix_content.split('\n'):
                line = line.strip()
                if not line:
                    continue

                parts = line.split()
                if len(parts) >= 2:
                    seq_id = parts[0]
                    trait_values = parts[1:]

                    traits[seq_id] = {}
                    for i, value in enumerate(trait_values):
                        label = (
                            trait_labels[i] if i < len(trait_labels) else f'trait_{i}'
                        )
                        traits[seq_id][label] = value

        return traits

    def read_alignment(self, progress_callback=None) -> Alignment:
        """
            Read alignment from NEXUS file.

        Parameters
        ----------
            progress_callback :
                Optional callback function(current, total).

        Returns
        -------
            Alignment object with metadata.
        """
        with self._open_file() as handle:
            content = handle.read()

        # Parse dimensions
        ntax, nchar = self._parse_dimensions(content)

        # Parse sequences
        seq_dict = self._parse_matrix(content)

        # Parse traits
        self.traits = self._parse_traits(content)

        # Create Sequence objects
        sequences = []
        count = 0

        for seq_id, seq_data in seq_dict.items():
            metadata = self.traits.get(seq_id, {})

            seq = Sequence(id=seq_id, data=seq_data.upper(), metadata=metadata)

            # Validation happens automatically in Sequence.__init__

            sequences.append(seq)

            count += 1
            if progress_callback:
                progress_callback(count, ntax)

        alignment = Alignment(sequences)

        if self.validate:
            alignment.validate()

        return alignment

    def get_traits(self) -> Dict[str, Dict[str, str]]:
        """
        Get parsed traits/metadata.

        Returns
        -------
            Dictionary mapping sequence IDs to trait dictionaries.
        """
        return self.traits
__init__
__init__(filepath: Union[str, Path], validate: bool = True)

Initialize NEXUS reader.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Path to NEXUS file.

required
validate bool

Whether to validate sequences and alignment.

True
Source code in src/pypopart/io/nexus.py
def __init__(self, filepath: Union[str, Path], validate: bool = True):
    """
    Initialize NEXUS reader.

    Parameters
    ----------
    filepath :
        Path to NEXUS file.
    validate :
        Whether to validate sequences and alignment.
    """
    self.filepath = Path(filepath)
    self.validate = validate

    if not self.filepath.exists():
        raise FileNotFoundError(f'File not found: {filepath}')

    self.traits = {}
read_alignment
read_alignment(progress_callback=None) -> Alignment
Read alignment from NEXUS file.

Returns:

Type Description
Alignment object with metadata.
Source code in src/pypopart/io/nexus.py
def read_alignment(self, progress_callback=None) -> Alignment:
    """
        Read alignment from NEXUS file.

    Parameters
    ----------
        progress_callback :
            Optional callback function(current, total).

    Returns
    -------
        Alignment object with metadata.
    """
    with self._open_file() as handle:
        content = handle.read()

    # Parse dimensions
    ntax, nchar = self._parse_dimensions(content)

    # Parse sequences
    seq_dict = self._parse_matrix(content)

    # Parse traits
    self.traits = self._parse_traits(content)

    # Create Sequence objects
    sequences = []
    count = 0

    for seq_id, seq_data in seq_dict.items():
        metadata = self.traits.get(seq_id, {})

        seq = Sequence(id=seq_id, data=seq_data.upper(), metadata=metadata)

        # Validation happens automatically in Sequence.__init__

        sequences.append(seq)

        count += 1
        if progress_callback:
            progress_callback(count, ntax)

    alignment = Alignment(sequences)

    if self.validate:
        alignment.validate()

    return alignment
get_traits
get_traits() -> Dict[str, Dict[str, str]]

Get parsed traits/metadata.

Returns:

Type Description
Dictionary mapping sequence IDs to trait dictionaries.
Source code in src/pypopart/io/nexus.py
def get_traits(self) -> Dict[str, Dict[str, str]]:
    """
    Get parsed traits/metadata.

    Returns
    -------
        Dictionary mapping sequence IDs to trait dictionaries.
    """
    return self.traits

NexusWriter

Writer for NEXUS format files.

Supports PopART-style NEXUS with traits blocks.

Source code in src/pypopart/io/nexus.py
class NexusWriter:
    """
    Writer for NEXUS format files.

    Supports PopART-style NEXUS with traits blocks.
    """

    def __init__(
        self,
        filepath: Union[str, Path],
        interleaved: bool = False,
        compress: Optional[str] = None,
    ):
        """
        Initialize NEXUS writer.

        Parameters
        ----------
        filepath :
            Output file path.
        interleaved :
            Whether to write in interleaved format.
        compress :
            Compression format ('gzip' or None).
        """
        self.filepath = Path(filepath)
        self.interleaved = interleaved
        self.compress = compress

        if compress == 'gzip' and not str(self.filepath).endswith('.gz'):
            self.filepath = Path(str(self.filepath) + '.gz')

    def _open_file(self) -> TextIO:
        """Open file for writing with optional compression."""
        if self.compress == 'gzip':
            return gzip.open(self.filepath, 'wt')
        else:
            return open(self.filepath, 'w')

    def write_alignment(
        self, alignment: Alignment, include_traits: bool = True, progress_callback=None
    ) -> None:
        """
        Write alignment to NEXUS file.

        Parameters
        ----------
        alignment :
            Alignment object.
        include_traits :
            Whether to include traits block.
        progress_callback :
            Optional callback function(current, total).
        """
        with self._open_file() as handle:
            # Write header
            handle.write('#NEXUS\n\n')

            # Write DATA block
            handle.write('BEGIN DATA;\n')
            handle.write(
                f'  DIMENSIONS NTAX={len(alignment)} NCHAR={alignment.length};\n'
            )
            handle.write('  FORMAT DATATYPE=DNA MISSING=? GAP=-;\n')
            handle.write('  MATRIX\n')

            # Write sequences
            count = 0
            for seq in alignment:
                # Pad ID to align sequences
                padded_id = seq.id.ljust(20)
                handle.write(f'    {padded_id} {seq.data}\n')

                count += 1
                if progress_callback:
                    progress_callback(count, len(alignment))

            handle.write('  ;\n')
            handle.write('END;\n')

            # Write TRAITS block if requested and metadata exists
            if include_traits:
                # Collect all trait keys
                all_traits = set()
                for seq in alignment:
                    all_traits.update(seq.metadata.keys())

                if all_traits:
                    handle.write('\n')
                    handle.write('BEGIN TRAITS;\n')
                    handle.write('  DIMENSIONS NTRAITS={};\n'.format(len(all_traits)))

                    # Write trait labels
                    trait_list = sorted(all_traits)
                    handle.write('  TRAITLABELS {};\n'.format(' '.join(trait_list)))

                    # Write trait matrix
                    handle.write('  MATRIX\n')
                    for seq in alignment:
                        padded_id = seq.id.ljust(20)
                        trait_values = [
                            str(seq.metadata.get(trait, '?')) for trait in trait_list
                        ]
                        handle.write(f'    {padded_id} {" ".join(trait_values)}\n')

                    handle.write('  ;\n')
                    handle.write('END;\n')
__init__
__init__(
    filepath: Union[str, Path],
    interleaved: bool = False,
    compress: Optional[str] = None,
)

Initialize NEXUS writer.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Output file path.

required
interleaved bool

Whether to write in interleaved format.

False
compress Optional[str]

Compression format ('gzip' or None).

None
Source code in src/pypopart/io/nexus.py
def __init__(
    self,
    filepath: Union[str, Path],
    interleaved: bool = False,
    compress: Optional[str] = None,
):
    """
    Initialize NEXUS writer.

    Parameters
    ----------
    filepath :
        Output file path.
    interleaved :
        Whether to write in interleaved format.
    compress :
        Compression format ('gzip' or None).
    """
    self.filepath = Path(filepath)
    self.interleaved = interleaved
    self.compress = compress

    if compress == 'gzip' and not str(self.filepath).endswith('.gz'):
        self.filepath = Path(str(self.filepath) + '.gz')
write_alignment
write_alignment(
    alignment: Alignment,
    include_traits: bool = True,
    progress_callback=None,
) -> None

Write alignment to NEXUS file.

Parameters:

Name Type Description Default
alignment Alignment

Alignment object.

required
include_traits bool

Whether to include traits block.

True
progress_callback

Optional callback function(current, total).

None
Source code in src/pypopart/io/nexus.py
def write_alignment(
    self, alignment: Alignment, include_traits: bool = True, progress_callback=None
) -> None:
    """
    Write alignment to NEXUS file.

    Parameters
    ----------
    alignment :
        Alignment object.
    include_traits :
        Whether to include traits block.
    progress_callback :
        Optional callback function(current, total).
    """
    with self._open_file() as handle:
        # Write header
        handle.write('#NEXUS\n\n')

        # Write DATA block
        handle.write('BEGIN DATA;\n')
        handle.write(
            f'  DIMENSIONS NTAX={len(alignment)} NCHAR={alignment.length};\n'
        )
        handle.write('  FORMAT DATATYPE=DNA MISSING=? GAP=-;\n')
        handle.write('  MATRIX\n')

        # Write sequences
        count = 0
        for seq in alignment:
            # Pad ID to align sequences
            padded_id = seq.id.ljust(20)
            handle.write(f'    {padded_id} {seq.data}\n')

            count += 1
            if progress_callback:
                progress_callback(count, len(alignment))

        handle.write('  ;\n')
        handle.write('END;\n')

        # Write TRAITS block if requested and metadata exists
        if include_traits:
            # Collect all trait keys
            all_traits = set()
            for seq in alignment:
                all_traits.update(seq.metadata.keys())

            if all_traits:
                handle.write('\n')
                handle.write('BEGIN TRAITS;\n')
                handle.write('  DIMENSIONS NTRAITS={};\n'.format(len(all_traits)))

                # Write trait labels
                trait_list = sorted(all_traits)
                handle.write('  TRAITLABELS {};\n'.format(' '.join(trait_list)))

                # Write trait matrix
                handle.write('  MATRIX\n')
                for seq in alignment:
                    padded_id = seq.id.ljust(20)
                    trait_values = [
                        str(seq.metadata.get(trait, '?')) for trait in trait_list
                    ]
                    handle.write(f'    {padded_id} {" ".join(trait_values)}\n')

                handle.write('  ;\n')
                handle.write('END;\n')

pypopart.io.phylip

PHYLIP file format reader and writer for PyPopART.

PhylipReader

Reader for PHYLIP format sequence files.

Supports both sequential and interleaved formats.

Source code in src/pypopart/io/phylip.py
class PhylipReader:
    """
    Reader for PHYLIP format sequence files.

    Supports both sequential and interleaved formats.
    """

    def __init__(
        self, filepath: Union[str, Path], strict: bool = False, validate: bool = True
    ):
        """
        Initialize PHYLIP reader.

        Parameters
        ----------
        filepath :
            Path to PHYLIP file.
        strict :
            Whether to use strict format (10-char IDs).
        validate :
            Whether to validate sequences and alignment.
        """
        self.filepath = Path(filepath)
        self.strict = strict
        self.validate = validate

        if not self.filepath.exists():
            raise FileNotFoundError(f'File not found: {filepath}')

    def _open_file(self) -> TextIO:
        """Open file handling gzip compression."""
        if self.filepath.suffix == '.gz':
            return gzip.open(self.filepath, 'rt')
        else:
            return open(self.filepath, 'r')

    def read_alignment(self, progress_callback=None) -> Alignment:
        """
            Read alignment from PHYLIP file.

        Parameters
        ----------
            progress_callback :
                Optional callback function(current, total).

        Returns
        -------
            Alignment object.
        """
        with self._open_file() as handle:
            lines = [line.rstrip() for line in handle if line.strip()]

        if not lines:
            raise ValueError('Empty PHYLIP file')

        # Parse header
        header_parts = lines[0].split()
        if len(header_parts) != 2:
            raise ValueError('Invalid PHYLIP header format')

        ntax = int(header_parts[0])
        nchar = int(header_parts[1])

        # Parse sequences
        sequences = {}
        current_line = 1

        # First pass: read sequence IDs and initial data
        for _i in range(ntax):
            if current_line >= len(lines):
                raise ValueError(f'Unexpected end of file (expected {ntax} sequences)')

            line = lines[current_line]

            if self.strict:
                # Strict format: first 10 characters are ID
                seq_id = line[:10].strip()
                seq_data = line[10:].replace(' ', '').replace('\t', '')
            else:
                # Relaxed format: ID separated by whitespace
                parts = line.split(None, 1)
                if len(parts) < 1:
                    raise ValueError(f'Invalid sequence line: {line}')

                seq_id = parts[0]
                seq_data = (
                    parts[1].replace(' ', '').replace('\t', '')
                    if len(parts) > 1
                    else ''
                )

            sequences[seq_id] = seq_data
            current_line += 1

        # Check if interleaved (more lines after initial block)
        if current_line < len(lines):
            # Interleaved format: read additional blocks
            seq_ids = list(sequences.keys())

            while current_line < len(lines):
                for seq_id in seq_ids:
                    if current_line >= len(lines):
                        break

                    line = lines[current_line].replace(' ', '').replace('\t', '')
                    sequences[seq_id] += line
                    current_line += 1

        # Create Sequence objects
        seq_list = []
        count = 0

        for seq_id, seq_data in sequences.items():
            if len(seq_data) != nchar:
                raise ValueError(
                    f"Sequence {seq_id} length {len(seq_data)} doesn't match expected {nchar}"
                )

            seq = Sequence(id=seq_id, data=seq_data.upper())

            # Validation happens automatically in Sequence.__init__

            seq_list.append(seq)

            count += 1
            if progress_callback:
                progress_callback(count, ntax)

        alignment = Alignment(seq_list)

        if self.validate:
            alignment.validate()

        return alignment
__init__
__init__(
    filepath: Union[str, Path],
    strict: bool = False,
    validate: bool = True,
)

Initialize PHYLIP reader.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Path to PHYLIP file.

required
strict bool

Whether to use strict format (10-char IDs).

False
validate bool

Whether to validate sequences and alignment.

True
Source code in src/pypopart/io/phylip.py
def __init__(
    self, filepath: Union[str, Path], strict: bool = False, validate: bool = True
):
    """
    Initialize PHYLIP reader.

    Parameters
    ----------
    filepath :
        Path to PHYLIP file.
    strict :
        Whether to use strict format (10-char IDs).
    validate :
        Whether to validate sequences and alignment.
    """
    self.filepath = Path(filepath)
    self.strict = strict
    self.validate = validate

    if not self.filepath.exists():
        raise FileNotFoundError(f'File not found: {filepath}')
read_alignment
read_alignment(progress_callback=None) -> Alignment
Read alignment from PHYLIP file.

Returns:

Type Description
Alignment object.
Source code in src/pypopart/io/phylip.py
def read_alignment(self, progress_callback=None) -> Alignment:
    """
        Read alignment from PHYLIP file.

    Parameters
    ----------
        progress_callback :
            Optional callback function(current, total).

    Returns
    -------
        Alignment object.
    """
    with self._open_file() as handle:
        lines = [line.rstrip() for line in handle if line.strip()]

    if not lines:
        raise ValueError('Empty PHYLIP file')

    # Parse header
    header_parts = lines[0].split()
    if len(header_parts) != 2:
        raise ValueError('Invalid PHYLIP header format')

    ntax = int(header_parts[0])
    nchar = int(header_parts[1])

    # Parse sequences
    sequences = {}
    current_line = 1

    # First pass: read sequence IDs and initial data
    for _i in range(ntax):
        if current_line >= len(lines):
            raise ValueError(f'Unexpected end of file (expected {ntax} sequences)')

        line = lines[current_line]

        if self.strict:
            # Strict format: first 10 characters are ID
            seq_id = line[:10].strip()
            seq_data = line[10:].replace(' ', '').replace('\t', '')
        else:
            # Relaxed format: ID separated by whitespace
            parts = line.split(None, 1)
            if len(parts) < 1:
                raise ValueError(f'Invalid sequence line: {line}')

            seq_id = parts[0]
            seq_data = (
                parts[1].replace(' ', '').replace('\t', '')
                if len(parts) > 1
                else ''
            )

        sequences[seq_id] = seq_data
        current_line += 1

    # Check if interleaved (more lines after initial block)
    if current_line < len(lines):
        # Interleaved format: read additional blocks
        seq_ids = list(sequences.keys())

        while current_line < len(lines):
            for seq_id in seq_ids:
                if current_line >= len(lines):
                    break

                line = lines[current_line].replace(' ', '').replace('\t', '')
                sequences[seq_id] += line
                current_line += 1

    # Create Sequence objects
    seq_list = []
    count = 0

    for seq_id, seq_data in sequences.items():
        if len(seq_data) != nchar:
            raise ValueError(
                f"Sequence {seq_id} length {len(seq_data)} doesn't match expected {nchar}"
            )

        seq = Sequence(id=seq_id, data=seq_data.upper())

        # Validation happens automatically in Sequence.__init__

        seq_list.append(seq)

        count += 1
        if progress_callback:
            progress_callback(count, ntax)

    alignment = Alignment(seq_list)

    if self.validate:
        alignment.validate()

    return alignment

PhylipWriter

Writer for PHYLIP format sequence files.

Source code in src/pypopart/io/phylip.py
class PhylipWriter:
    """Writer for PHYLIP format sequence files."""

    def __init__(
        self,
        filepath: Union[str, Path],
        strict: bool = False,
        interleaved: bool = False,
        line_length: int = 60,
        compress: Optional[str] = None,
    ):
        """
        Initialize PHYLIP writer.

        Parameters
        ----------
        filepath :
            Output file path.
        strict :
            Whether to use strict format (10-char IDs).
        interleaved :
            Whether to write in interleaved format.
        line_length :
            Line length for interleaved format.
        compress :
            Compression format ('gzip' or None).
        """
        self.filepath = Path(filepath)
        self.strict = strict
        self.interleaved = interleaved
        self.line_length = line_length
        self.compress = compress

        if compress == 'gzip' and not str(self.filepath).endswith('.gz'):
            self.filepath = Path(str(self.filepath) + '.gz')

    def _open_file(self) -> TextIO:
        """Open file for writing with optional compression."""
        if self.compress == 'gzip':
            return gzip.open(self.filepath, 'wt')
        else:
            return open(self.filepath, 'w')

    def write_alignment(self, alignment: Alignment, progress_callback=None) -> None:
        """
        Write alignment to PHYLIP file.

        Parameters
        ----------
        alignment :
            Alignment object.
        progress_callback :
            Optional callback function(current, total).
        """
        with self._open_file() as handle:
            # Write header
            handle.write(f' {len(alignment)} {alignment.length}\n')

            if self.interleaved:
                # Interleaved format
                num_blocks = (
                    alignment.length + self.line_length - 1
                ) // self.line_length

                for block in range(num_blocks):
                    start = block * self.line_length
                    end = min(start + self.line_length, alignment.length)

                    for i, seq in enumerate(alignment):
                        if block == 0:
                            # First block: include ID
                            if self.strict:
                                seq_id = seq.id[:10].ljust(10)
                            else:
                                seq_id = seq.id.ljust(20)
                            handle.write(f'{seq_id} {seq.data[start:end]}\n')
                        else:
                            # Subsequent blocks: just sequence data
                            handle.write(f'{seq.data[start:end]}\n')

                        if progress_callback:
                            progress_callback(i + 1, len(alignment))

                    # Blank line between blocks
                    if block < num_blocks - 1:
                        handle.write('\n')
            else:
                # Sequential format
                for i, seq in enumerate(alignment):
                    if self.strict:
                        seq_id = seq.id[:10].ljust(10)
                    else:
                        seq_id = seq.id.ljust(20)

                    handle.write(f'{seq_id} {seq.data}\n')

                    if progress_callback:
                        progress_callback(i + 1, len(alignment))
__init__
__init__(
    filepath: Union[str, Path],
    strict: bool = False,
    interleaved: bool = False,
    line_length: int = 60,
    compress: Optional[str] = None,
)

Initialize PHYLIP writer.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Output file path.

required
strict bool

Whether to use strict format (10-char IDs).

False
interleaved bool

Whether to write in interleaved format.

False
line_length int

Line length for interleaved format.

60
compress Optional[str]

Compression format ('gzip' or None).

None
Source code in src/pypopart/io/phylip.py
def __init__(
    self,
    filepath: Union[str, Path],
    strict: bool = False,
    interleaved: bool = False,
    line_length: int = 60,
    compress: Optional[str] = None,
):
    """
    Initialize PHYLIP writer.

    Parameters
    ----------
    filepath :
        Output file path.
    strict :
        Whether to use strict format (10-char IDs).
    interleaved :
        Whether to write in interleaved format.
    line_length :
        Line length for interleaved format.
    compress :
        Compression format ('gzip' or None).
    """
    self.filepath = Path(filepath)
    self.strict = strict
    self.interleaved = interleaved
    self.line_length = line_length
    self.compress = compress

    if compress == 'gzip' and not str(self.filepath).endswith('.gz'):
        self.filepath = Path(str(self.filepath) + '.gz')
write_alignment
write_alignment(
    alignment: Alignment, progress_callback=None
) -> None

Write alignment to PHYLIP file.

Parameters:

Name Type Description Default
alignment Alignment

Alignment object.

required
progress_callback

Optional callback function(current, total).

None
Source code in src/pypopart/io/phylip.py
def write_alignment(self, alignment: Alignment, progress_callback=None) -> None:
    """
    Write alignment to PHYLIP file.

    Parameters
    ----------
    alignment :
        Alignment object.
    progress_callback :
        Optional callback function(current, total).
    """
    with self._open_file() as handle:
        # Write header
        handle.write(f' {len(alignment)} {alignment.length}\n')

        if self.interleaved:
            # Interleaved format
            num_blocks = (
                alignment.length + self.line_length - 1
            ) // self.line_length

            for block in range(num_blocks):
                start = block * self.line_length
                end = min(start + self.line_length, alignment.length)

                for i, seq in enumerate(alignment):
                    if block == 0:
                        # First block: include ID
                        if self.strict:
                            seq_id = seq.id[:10].ljust(10)
                        else:
                            seq_id = seq.id.ljust(20)
                        handle.write(f'{seq_id} {seq.data[start:end]}\n')
                    else:
                        # Subsequent blocks: just sequence data
                        handle.write(f'{seq.data[start:end]}\n')

                    if progress_callback:
                        progress_callback(i + 1, len(alignment))

                # Blank line between blocks
                if block < num_blocks - 1:
                    handle.write('\n')
        else:
            # Sequential format
            for i, seq in enumerate(alignment):
                if self.strict:
                    seq_id = seq.id[:10].ljust(10)
                else:
                    seq_id = seq.id.ljust(20)

                handle.write(f'{seq_id} {seq.data}\n')

                if progress_callback:
                    progress_callback(i + 1, len(alignment))

pypopart.io.genbank

GenBank file format reader for PyPopART.

GenBankReader

Reader for GenBank format sequence files.

Source code in src/pypopart/io/genbank.py
class GenBankReader:
    """Reader for GenBank format sequence files."""

    def __init__(self, filepath: Union[str, Path], validate: bool = True):
        """
        Initialize GenBank reader.

        Parameters
        ----------
        filepath :
            Path to GenBank file.
        validate :
            Whether to validate sequences.
        """
        self.filepath = Path(filepath)
        self.validate = validate

        if not self.filepath.exists():
            raise FileNotFoundError(f'File not found: {filepath}')

    def _open_file(self) -> TextIO:
        """Open file handling gzip compression."""
        if self.filepath.suffix == '.gz':
            return gzip.open(self.filepath, 'rt')
        else:
            return open(self.filepath, 'r')

    def read_sequences(self, progress_callback=None) -> Iterator[Sequence]:
        """
        Read sequences from GenBank file.

        Parameters
        ----------
        progress_callback :
            Optional callback function(current, total).

        Yields :
            Sequence objects.
        """
        count = 0
        with self._open_file() as handle:
            for record in SeqIO.parse(handle, 'genbank'):
                # Extract metadata from annotations
                metadata = {}

                if record.annotations:
                    # Add relevant annotations as metadata
                    for key in ['organism', 'source', 'taxonomy', 'date']:
                        if key in record.annotations:
                            value = record.annotations[key]
                            if isinstance(value, list):
                                value = '; '.join(str(v) for v in value)
                            metadata[key] = str(value)

                # Add features as metadata if present
                if record.features:
                    feature_types = {f.type for f in record.features}
                    metadata['features'] = ', '.join(feature_types)

                seq = Sequence(
                    id=record.id,
                    data=str(record.seq).upper(),
                    description=record.description,
                    metadata=metadata,
                )

                # Validation happens automatically in Sequence.__init__

                count += 1
                if progress_callback:
                    progress_callback(count, None)

                yield seq
__init__
__init__(filepath: Union[str, Path], validate: bool = True)

Initialize GenBank reader.

Parameters:

Name Type Description Default
filepath Union[str, Path]

Path to GenBank file.

required
validate bool

Whether to validate sequences.

True
Source code in src/pypopart/io/genbank.py
def __init__(self, filepath: Union[str, Path], validate: bool = True):
    """
    Initialize GenBank reader.

    Parameters
    ----------
    filepath :
        Path to GenBank file.
    validate :
        Whether to validate sequences.
    """
    self.filepath = Path(filepath)
    self.validate = validate

    if not self.filepath.exists():
        raise FileNotFoundError(f'File not found: {filepath}')
read_sequences
read_sequences(
    progress_callback=None,
) -> Iterator[Sequence]

Read sequences from GenBank file.

Parameters:

Name Type Description Default
progress_callback

Optional callback function(current, total).

None
Yields

Sequence objects.

required
Source code in src/pypopart/io/genbank.py
def read_sequences(self, progress_callback=None) -> Iterator[Sequence]:
    """
    Read sequences from GenBank file.

    Parameters
    ----------
    progress_callback :
        Optional callback function(current, total).

    Yields :
        Sequence objects.
    """
    count = 0
    with self._open_file() as handle:
        for record in SeqIO.parse(handle, 'genbank'):
            # Extract metadata from annotations
            metadata = {}

            if record.annotations:
                # Add relevant annotations as metadata
                for key in ['organism', 'source', 'taxonomy', 'date']:
                    if key in record.annotations:
                        value = record.annotations[key]
                        if isinstance(value, list):
                            value = '; '.join(str(v) for v in value)
                        metadata[key] = str(value)

            # Add features as metadata if present
            if record.features:
                feature_types = {f.type for f in record.features}
                metadata['features'] = ', '.join(feature_types)

            seq = Sequence(
                id=record.id,
                data=str(record.seq).upper(),
                description=record.description,
                metadata=metadata,
            )

            # Validation happens automatically in Sequence.__init__

            count += 1
            if progress_callback:
                progress_callback(count, None)

            yield seq