Source code for pymchelper.writers.mcpl
import logging
from pathlib import Path
import struct
import numpy as np
import pymchelper
from pymchelper.page import Page
from pymchelper.shieldhit.detector.detector_type import SHDetType
from pymchelper.writers.writer import Writer
logger = logging.getLogger(__name__)
[docs]
class MCPLWriter(Writer):
"""MCPL data writer"""
def __init__(self, output_path: str, _):
super().__init__(output_path)
self.output_path = self.output_path.with_suffix(".mcpl")
[docs]
def write_single_page(self, page: Page, output_path: Path):
"""TODO"""
logger.info("Writing page to: %s", str(output_path))
# special case for MCPL data
if page.dettyp == SHDetType.mcpl:
# first part of the header
header_bytes = "MCPL".encode('ascii') # magic number
header_bytes += "003".encode('ascii') # version
header_bytes += "L".encode('ascii') # little endian
header_bytes += struct.pack("<Q", page.data.shape[1]) # number of particles
header_bytes += struct.pack("<I", 0) # number of custom comments
header_bytes += struct.pack("<I", 0) # number of custom binary blobs
header_bytes += struct.pack("<I", 0) # user flags disabled
header_bytes += struct.pack("<I", 0) # polarisation disabled
header_bytes += struct.pack("<I", 1) # single precision for floats
header_bytes += struct.pack("<i", 0) # all particles have PDG code
header_bytes += struct.pack("<I", 32) # data length
header_bytes += struct.pack("<I", 1) # universal weight
# second part of the header
header_bytes += struct.pack("<d", 1) # universal weight value
# data arrays
source_name = f"pymchelper {pymchelper.__version__}"
header_bytes += struct.pack("<I", len(source_name)) # length of the source name
header_bytes += source_name.encode('ascii') # source name
# particle data
# need to fix the structure according to MCPL format
# see https://mctools.github.io/mcpl/mcpl.pdf#nameddest=section.3
# we use numpy to speed up the process for large arrays
# Create a structured array with named fields, as some of the fields have different types
dt = np.dtype([('x', np.float32), ('y', np.float32), ('z', np.float32), ('fp1', np.float32),
('fp2', np.float32), ('uz', np.float32), ('time', np.float32), ('pdg', np.uint32)])
data_bytes = np.empty(page.data.shape[1], dtype=dt)
# Assign values to the fields
data_bytes['x'] = page.data[1]
data_bytes['y'] = page.data[2]
data_bytes['z'] = page.data[3]
data_bytes['fp1'] = page.data[4] # ux by default
data_bytes['fp2'] = page.data[5] # uy by default
data_bytes['time'] = 0
data_bytes['pdg'] = page.data[0]
ux2 = page.data[4]**2
uy2 = page.data[5]**2
uz2 = page.data[6]**2
sign = np.ones_like(page.data[6], dtype=int)
# find the maximum component of the direction vector
# condition below defines the case where the maximum component is ux
condition_1 = np.logical_and(ux2 >= uy2, ux2 > uz2)
# condition below defines the case where the maximum component is uy
condition_2 = np.logical_and(uy2 > ux2, uy2 > uz2)
# by exclusion, the remaining case is where the maximum component is uz
condition_3 = np.logical_not(np.logical_or(condition_1, condition_2))
# fill the arrays according to the maximum component
# lets start with the case where the maximum component is uz
sign[(page.data[6] < 0) & condition_3] = -1
# fill the arrays according to the maximum component ux
sign[(page.data[4] < 0) & condition_1] = -1 # negative sign of ux
data_bytes['fp1'][condition_1] = 1 / page.data[6][condition_1] # 1/uz
data_bytes['fp2'][condition_1] = page.data[5][condition_1] # uy
# fill the arrays according to the maximum component uy
sign[(page.data[5] < 0) & condition_2] = -1 # sign of uy
data_bytes['fp1'][condition_2] = page.data[4][condition_2] # ux
data_bytes['fp2'][condition_2] = 1 / page.data[6][condition_2] # 1/uz
data_bytes['uz'] = sign * page.data[7]
data_bytes = data_bytes.tobytes()
output_path.write_bytes(header_bytes + data_bytes)
return