import logging
import os
import sys
from argparse import ArgumentParser, Namespace
from multiprocessing import Pool
from time import sleep
from typing import List, Union
import pydicom
from bson import ObjectId
from pyaml_env import parse_config
from pydicom.errors import BytesLengthException
from pymongo.errors import DuplicateKeyError
from tqdm import tqdm
from cobra_db.dataset_mod import DatasetMod
from cobra_db.deid import Deider, base_recipe_path, mr_recipe_path
from cobra_db.filesystem import get_instance_path
from cobra_db.model import FileSource, ImageMetadata
from cobra_db.mongo_dao import Connector, ImageMetadataDao
from cobra_db.scripts.utils import add_args_to_iterable, batcher
[docs]def parse_arguments(raw_args: List[str]) -> str:
"""Parse the arguments for the pseudonymization process
:param raw_args: arguments from sys.argv[1:]
:return: a namespace with the required cfg
"""
parser = ArgumentParser(
"Pseudonymize ImageMetadata",
description="""
Given an ImageMetadata collection with real data, create a second database with the
pseudonymized version. The files will be pseudonymized and stored according to the
suggested filesystem paths. Additionally, docs that are correctly pseudonymized will
be updated with a `deid_file_source` field that allows to check where the data went.""",
)
parser.add_argument(
"config_path",
type=str,
help="the config file to be used.\
For an example of the config file, please read the documentation how-to guide",
)
args = parser.parse_args(raw_args)
return Namespace(**parse_config(args.config_path, default_value=None))
[docs]def process_batch(args):
im_metas, cfg = args
src_im_dao = ImageMetadataDao(Connector(**cfg.src_mongo))
dst_im_dao = ImageMetadataDao(Connector(**cfg.dst_mongo))
try:
processed = 0
seen = 0
for im_meta in im_metas:
seen += 1
try:
processed += process_im_meta(im_meta, cfg, src_im_dao, dst_im_dao)
except IndexError:
pass
finally:
src_im_dao.connector.close()
dst_im_dao.connector.close()
return seen, processed
[docs]def get_required_drive_names(src_mongo, query):
connector = Connector(**src_mongo)
im_dao = ImageMetadataDao(connector)
logging.info("Searching images to obtain required drives for query")
print(f"Running query: {query}")
cursor = im_dao.collection.aggregate(
[{"$match": query}, {"$sortByCount": "$file_source.drive_name"}]
)
drive_names = []
images = 0
for doc in cursor:
drive_names.append(doc["_id"])
images += doc["count"]
return drive_names, images
[docs]def check_mount_paths(mount_paths, required_drive_names, dst_drive_name):
missing = set(required_drive_names) - set(mount_paths.keys())
assert missing == set(), f"Missing configuration for drive_names: {missing}"
assert mount_paths.get(dst_drive_name) is not None, "Missing dst_drive_name"
[docs]def single_proc(batches, n_proc=1):
for batch in batches:
seen, processed = process_batch(batch)
yield seen, processed
[docs]def multi_proc(batches, n_proc):
pool = Pool(n_proc)
for seen, processed in pool.imap_unordered(process_batch, batches):
yield seen, processed
[docs]def query_mux(query, image_ids: List[ObjectId] = None):
"""Select one of query or image_ids"""
if image_ids is not None:
assert query == {}, "Cannot have a query and a list of images at the same time"
query = {"_id": {"$in": image_ids}}
return query
[docs]def recipe_mux(base: bool, mr: bool, recipe: Union[str, List[str]]):
"""Select the correct recipe according to the configurations"""
recipes = []
if base:
print(f"Using VAIB recipe from: {base_recipe_path}")
recipes.append(base_recipe_path)
if mr:
print(
f"Using additional to VAIB recipe targeted to MR studies from: {mr_recipe_path}"
)
recipes.append(mr_recipe_path)
if type(recipe) == str:
recipes.append(recipe)
if type(recipe) == list:
recipes = recipes + recipe
for r in recipes:
assert os.path.exists(r), f"Deid recipe path does not exist: {r}"
if len(recipes) == 0:
return None
logging.info(f"Using recipes: {recipes}")
return recipes
[docs]def main(args=None, image_ids: List[ObjectId] = None):
if args is None:
# Allows testing main from pytest.
args = sys.argv[1:]
# read config file
cfg = parse_arguments(args)
cfg.query = query_mux(cfg.query, image_ids)
# Check how many files will be processed and if the cfg is enough
required_drive_names, total_imgs = get_required_drive_names(
cfg.src_mongo, cfg.query
)
check_mount_paths(cfg.mount_paths, required_drive_names, cfg.dst_drive_name)
im_meta_gen = im_meta_generator(cfg.src_mongo, cfg.query)
logging.info("Saving list of files to be processed")
imgs = list(
tqdm(im_meta_gen, desc="Reading list of files to be processed")
) # save as list to avoid losing the cursor
batches = batcher(imgs, batch_size=cfg.batch_size)
# Due to a weird configuration system of the deid library this has to be loaded
# after setting the MESSAGELEVEL env var.
os.environ["MESSAGELEVEL"] = cfg.logging_level
cfg.deider = Deider(
cfg.hash_secret,
recipe_mux(
cfg.deid_default_recipes["base"],
cfg.deid_default_recipes["mr"],
cfg.user_recipe_path,
),
logging_level=cfg.logging_level,
)
batches = add_args_to_iterable(batches, cfg)
pbar_seen = tqdm(total=total_imgs, desc="Images seen", smoothing=0.001)
pbar_processed = tqdm(total=total_imgs, desc="Images processed", smoothing=0.001)
if cfg.n_proc == 1:
process_func = single_proc
if cfg.n_proc > 1:
process_func = multi_proc
if cfg.n_proc < 1:
raise ValueError("n_proc must be bigger than 0")
for seen, processed in process_func(batches, cfg.n_proc):
pbar_processed.update(processed)
pbar_seen.update(seen)