Source code for pymchelper.writers.mcpl
import logging
from pathlib import Path
import struct
import numpy as np
import pymchelper
from pymchelper.page import Page
from pymchelper.shieldhit.detector.detector_type import SHDetType
from pymchelper.writers.writer import Writer
logger = logging.getLogger(__name__)
[docs]
class MCPLWriter(Writer):
    """MCPL data writer"""
    def __init__(self, output_path: str, _):
        super().__init__(output_path)
        self.output_path = self.output_path.with_suffix(".mcpl")
[docs]
    def write_single_page(self, page: Page, output_path: Path):
        """TODO"""
        logger.info("Writing page to: %s", str(output_path))
        # special case for MCPL data
        if page.dettyp == SHDetType.mcpl:
            # first part of the header
            header_bytes = "MCPL".encode('ascii')  # magic number
            header_bytes += "003".encode('ascii')  # version
            header_bytes += "L".encode('ascii')  # little endian
            header_bytes += struct.pack("<Q", page.data.shape[1])  # number of particles
            header_bytes += struct.pack("<I", 0)  # number of custom comments
            header_bytes += struct.pack("<I", 0)  # number of custom binary blobs
            header_bytes += struct.pack("<I", 0)  # user flags disabled
            header_bytes += struct.pack("<I", 0)  # polarisation disabled
            header_bytes += struct.pack("<I", 1)  # single precision for floats
            header_bytes += struct.pack("<i", 0)  # all particles have PDG code
            header_bytes += struct.pack("<I", 32)  # data length
            header_bytes += struct.pack("<I", 1)  # universal weight
            # second part of the header
            header_bytes += struct.pack("<d", 1)  # universal weight value
            # data arrays
            source_name = f"pymchelper {pymchelper.__version__}"
            header_bytes += struct.pack("<I", len(source_name))  # length of the source name
            header_bytes += source_name.encode('ascii')  # source name
            # particle data
            # need to fix the structure according to MCPL format
            # see https://mctools.github.io/mcpl/mcpl.pdf#nameddest=section.3
            # we use numpy to speed up the process for large arrays
            # Create a structured array with named fields, as some of the fields have different types
            dt = np.dtype([('x', np.float32), ('y', np.float32), ('z', np.float32), ('fp1', np.float32),
                           ('fp2', np.float32), ('uz', np.float32), ('time', np.float32), ('pdg', np.uint32)])
            data_bytes = np.empty(page.data.shape[1], dtype=dt)
            # Assign values to the fields
            data_bytes['x'] = page.data[1]
            data_bytes['y'] = page.data[2]
            data_bytes['z'] = page.data[3]
            data_bytes['fp1'] = page.data[4]  # ux by default
            data_bytes['fp2'] = page.data[5]  # uy by default
            data_bytes['time'] = 0
            data_bytes['pdg'] = page.data[0]
            ux2 = page.data[4]**2
            uy2 = page.data[5]**2
            uz2 = page.data[6]**2
            sign = np.ones_like(page.data[6], dtype=int)
            # find the maximum component of the direction vector
            # condition below defines the case where the maximum component is ux
            condition_1 = np.logical_and(ux2 >= uy2, ux2 > uz2)
            # condition below defines the case where the maximum component is uy
            condition_2 = np.logical_and(uy2 > ux2, uy2 > uz2)
            # by exclusion, the remaining case is where the maximum component is uz
            condition_3 = np.logical_not(np.logical_or(condition_1, condition_2))
            # fill the arrays according to the maximum component
            # lets start with the case where the maximum component is uz
            sign[(page.data[6] < 0) & condition_3] = -1
            # fill the arrays according to the maximum component ux
            sign[(page.data[4] < 0) & condition_1] = -1  # negative sign of ux
            data_bytes['fp1'][condition_1] = 1 / page.data[6][condition_1]  # 1/uz
            data_bytes['fp2'][condition_1] = page.data[5][condition_1]  # uy
            # fill the arrays according to the maximum component uy
            sign[(page.data[5] < 0) & condition_2] = -1  # sign of uy
            data_bytes['fp1'][condition_2] = page.data[4][condition_2]  # ux
            data_bytes['fp2'][condition_2] = 1 / page.data[6][condition_2]  # 1/uz
            data_bytes['uz'] = sign * page.data[7]
            data_bytes = data_bytes.tobytes()
            output_path.write_bytes(header_bytes + data_bytes)
            return