Source code for cobra_db.deid

import logging
import os
from copy import deepcopy
from datetime import datetime
from typing import List, Union

import pydicom
from deid.config import DeidRecipe
from deid.dicom.parser import DicomParser
from deid.logger import bot

from cobra_db.encrypt import Hasher
from cobra_db.utils import parse_AS_as_int, parse_DA_TM_as_datetime

base_recipe_path = os.path.join(os.path.dirname(__file__), "deid_recipe.txt")
mr_recipe_path = os.path.join(os.path.dirname(__file__), "deid_recipe_mr.txt")

deid_logging_levels = dict(
    ABORT=-5,
    FLAG=-4,
    ERROR=-3,
    WARNING=-2,
    LOG=-1,
    INFO=1,
    CUSTOM=1,
    QUIET=0,
    VERBOSE=2,
    VERBOSE2=3,
    VERBOSE3=4,
    DEBUG=5,
)


[docs]class Deider: def __init__( self, hasher_secret_salt: str, recipe_path: Union[str, List] = None, logging_level: str = "ERROR", ): """Deidentify datasets according to vaib recipe :param recipe_path: path to the deid recipe :param hasher_secret_salt: salt for hashing """ bot.level = deid_logging_levels[logging_level] if recipe_path is None: logging.warning(f"DeidDataset using default recipe {base_recipe_path}") recipe_path = base_recipe_path if type(recipe_path) == list: for r in recipe_path: assert os.path.exists(r), f"Invalid recipe_path: {r}" else: assert os.path.exists(recipe_path), f"Invalid recipe_path: {recipe_path}" self.recipe = DeidRecipe(recipe_path) self.hasher = Hasher(hasher_secret_salt)
[docs] def pseudonymize(self, dataset: pydicom.Dataset) -> pydicom.Dataset: """Pseudonymize a single dicom dataset :param dataset: dataset that will be pseudonymized :returns: pseudonymized dataset """ dataset = deepcopy(dataset) parser = DicomParser(dataset, self.recipe) parser.define("replace_name", self._replace_name) parser.define("hash_func", self._deid_hash_func) parser.define("remove_day", self._remove_day) parser.define("round_AS_to_nearest_5y", self._round_AS_to_nearest_5y) parser.define("round_DS_to_nearest_5", self._round_DS_to_nearest_5) parser.define("round_DS_to_nearest_0_05", self._round_DS_to_nearest_0_05) parser.parse(strip_sequences=False, remove_private=True) return parser.dicom
[docs] @staticmethod def _remove_day(item, value, field, dicom): """Removes the day from a DT field in the deid framework""" date = field.element.value if date == "": return "" dt = datetime.strptime(date, "%Y%m%d") return dt.strftime("%Y%m01")
[docs] @staticmethod def _replace_name(item, value, field, dicom): sex = dicom.get("PatientSex") sex = {"F": "Female", "M": "Male", "O": "Other", "": "Unk"}[sex] age = Deider._round_to_nearest(parse_AS_as_int(dicom.get("PatientAge")), 5) return f"{sex} {age:03d}Y {dicom.get('Modality')}"
[docs] @staticmethod def _round_to_nearest(value, interval): """Rounds value to closest multiple of interval""" return interval * round(value / interval)
[docs] @staticmethod def _round_AS_to_nearest_5y(item, value, field, dicom): """Rounds age(AS) field to 5 year intervals in the deid framework""" value = field.element.value # if age is empty, try to calculate it from the StudyDate and PatientBirthDate if value == "" or value is None: try: study_date = dicom.get("StudyDate") study_date = parse_DA_TM_as_datetime(DA=study_date, TM="000000") birth_date = dicom.get("PatientBirthDate") birth_date = parse_DA_TM_as_datetime(DA=birth_date, TM="000000") age = (study_date - birth_date).days / 365 except Exception as e: logging.error(e) return "" else: age = parse_AS_as_int(field.element.value) return f"{Deider._round_to_nearest(age, 5):03d}Y"
[docs] @staticmethod def _round_DS_to_nearest_5(item, value, field, dicom): """Rounds age(AS) field to 5 year intervals in the deid framework""" value = field.element.value if value is None: value = -1 return f"{Deider._round_to_nearest(float(value), 5)}"
[docs] @staticmethod def _round_DS_to_nearest_0_05(item, value, field, dicom) -> str: """Rounds field.element.value to increments of 0.05""" value = field.element.value if value is None: value = -1 return f"{Deider._round_to_nearest(float(value), 0.05):.02f}"
[docs] def _deid_hash_func(self, item, value, field, dicom) -> str: """Performs self.hash to field.element.value""" val = field.element.value return self.hasher.hash(str(val))