Skip to content

Terminology mapper

Provide the main terminology-mapping orchestration utilities for the package.

This module defines the high-level mapping workflow that converts source concepts into standardized concepts using a configurable pipeline composed of translation, retrieval, reranking, and selection stages. It also includes a rate-limiting helper and utilities for loading mapping tasks from structured configuration objects.

The main entry point is TerminologyMapper, which can read OMOP-like source concept files, process them in batches, and write the mapped output to disk.

TerminologyMapper

Coordinate end-to-end terminology mapping through a configurable pipeline.

This class orchestrates the full mapping workflow for source concepts, including optional translation, candidate retrieval, reranking, final selection, batching, rate limiting, and output generation. It is designed to work with OMOP-style source concept files and pluggable pipeline components.

Instances can be created directly by passing configured components or built from a structured TerminologyMappingTask configuration object.

Source code in aatm\terminology_mapper.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
class TerminologyMapper:
    """Coordinate end-to-end terminology mapping through a configurable pipeline.

    This class orchestrates the full mapping workflow for source concepts,
    including optional translation, candidate retrieval, reranking, final
    selection, batching, rate limiting, and output generation. It is designed
    to work with OMOP-style source concept files and pluggable pipeline
    components.

    Instances can be created directly by passing configured components or built
    from a structured ``TerminologyMappingTask`` configuration object.
    """

    def __init__(
        self,
        input_file: Optional[str | Path] = None,
        output_dir: str | Path = Path("output"),
        translator: Optional[BaseTranslator | str] = None,
        retriever: Optional[BaseRetriever | str] = None,
        selector: Optional[BaseSelector | str] = None,
        reranker: Optional[BaseReranker | str] = None,
        batch_size: Optional[int] = None,
        rate_limit: Optional[int] = None,
        column_mapping: Optional[dict] = None,
        limit_to: Optional[int] = None,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Initialize the terminology mapper and its pipeline components.

        This constructor sets up the terminology-mapping pipeline, defaulting to
        built-in translator, retriever, selector, and reranker behaviors when
        custom components are not provided. It also prepares output paths,
        stores batching and rate-limiting settings, and defines the expected
        input schema for source concept files.

        Args:
            input_file: Optional path to the source concept file to map.
            output_dir: Directory where mapping outputs will be written.
            translator: Optional translator component used before retrieval. Expects a BaseTranslator or the translator id in the registry.
            retriever: Optional retriever component used to fetch candidate
                concepts. Expects a BaseRetriever or the retriever id in the registry.
            selector: Optional selector component used to choose the final
                mapped concept. Expects a BaseSelector or the selector id in the registry.
            reranker: Optional reranker component used to reorder retrieved
                candidates before selection. Expects a BaseReranker or the reranker id in the registry.
            batch_size: Number of source concepts to process per batch.
            rate_limit: Optional maximum number of items to process per minute.
            column_mapping: Optional mapping from input column names to the
                expected OMOP-style column names.
            limit_to: Optional maximum number of input rows to process.
            *args: Additional positional arguments reserved for compatibility.
            **kwargs: Additional keyword arguments reserved for compatibility.

        Returns:
            None.
        """
        # Define translator
        if translator is None:
            self.translator = EmptyTranslator()

        elif isinstance(translator, str):
            self.translator = load_translator(translator)

        elif isinstance(BaseTranslator, translator):
            self.translator = translator

        else:
            raise TypeError(
                f"Translator must be a BaseTranslator or str representing the translator name.  Given type: {type(translator)}."
            )

        # Define retriever
        if retriever is None:
            client = chromadb.PersistentClient()
            self.retriever = ChromaDBRetriever(
                client=client,
                collection_name="expressions",
                embedding_function=GoogleEmbeddingFunction(
                    model="gemini-embedding-001"
                ),
            )

        elif isinstance(retriever, str):
            self.retriever = load_retriever(retriever)

        elif isinstance(BaseRetriever, retriever):
            self.retriever = retriever

        else:
            raise TypeError(
                f"Retriever must be a BaseRetriever or str representing the retriever name.  Given type: {type(retriever)}."
            )

        # Define selector
        if selector is None:
            self.selector = FirstResultSelector()

        elif isinstance(selector, str):
            self.selector = load_selector(selector)

        elif isinstance(BaseSelector, selector):
            self.selector = selector

        else:
            raise TypeError(
                f"Selector must be a BaseSelector or str representing the selector name.  Given type: {type(selector)}."
            )

        # Define reranker
        if reranker is None:
            self.reranker = PipelineBaseClass()

        elif isinstance(reranker, str):
            self.reranker = load_reranker(reranker)

        elif isinstance(BaseReranker, reranker):
            self.reranker = reranker

        else:
            raise TypeError(
                f"Reranker must be a BaseReranker or str representing the reranker name.  Given type: {type(reranker)}."
            )

        # Other attributes
        if isinstance(output_dir, str):
            output_dir = Path(output_dir)

        self.input_file: Optional[Path] = Path(input_file) if input_file else None
        self.output_dir: Path = output_dir
        self.output_dir.mkdir(parents=True, exist_ok=True)
        self.batch_size: int = batch_size if batch_size else 100
        self.rate_limit: Optional[int] = rate_limit
        self.expected_columns: set[str] = set(
            [
                "source_code",
                "source_concept_id",
                "source_vocabulary_id",
                "source_code_description",
                "valid_start_date",
                "valid_end_date",
                "invalid_reason",
            ]
        )
        self.column_mapping = column_mapping
        self.limit_to = limit_to

    @classmethod
    def from_task_config(
        cls, task_config: TerminologyMappingTask
    ) -> "TerminologyMapper":
        """Create a terminology mapper from a task configuration object.

        This factory method resolves the configured translator, retriever,
        selector, and reranker from their registries and initializes a
        ``TerminologyMapper`` with the remaining task parameters.

        Args:
            task_config: Structured task configuration describing the mapping
                pipeline and runtime settings.

        Returns:
            A configured ``TerminologyMapper`` instance.
        """
        translator = (
            load_translator(task_config.translator_id)
            if task_config.translator_id
            else None
        )

        retriever = (
            load_retriever(task_config.retriever_id)
            if task_config.retriever_id
            else None
        )

        selector = (
            load_selector(task_config.selector_id) if task_config.selector_id else None
        )

        reranker = (
            load_reranker(task_config.reranker_id) if task_config.reranker_id else None
        )

        return cls(
            translator=translator,
            retriever=retriever,
            selector=selector,
            reranker=reranker,
            batch_size=task_config.batch_size,
            rate_limit=task_config.rate_limit,
            input_file=task_config.input_file,
            output_dir=task_config.output_dir,
            column_mapping=task_config.column_mapping,
            limit_to=task_config.limit_to,
        )

    @classmethod
    def from_task_request(
        cls,
        task_request: TerminologyMappingRequest,
        api_config: APIConfig = None,
    ) -> "TerminologyMapper":
        return cls(
            translator=task_request.translator_id,
            retriever=task_request.retriever_id,
            selector=task_request.selector_id,
            reranker=task_request.reranker_id,
            batch_size=api_config.batch_size if api_config else None,
            rate_limit=api_config.rate_limit if api_config else None,
        )

    def map(
        self,
        expressions: Optional[List[str | SourceConcept]] = None,
        file_path: str | Path = None,
        limit_to: int = None,
        output_dir: str | Path = None,
        return_as: Literal["df", "mapped_source_concepts"] = "df",
        save_to_disk: bool = True,
    ) -> pd.DataFrame:
        """Map source concepts from a file to standardized concepts.

        This method loads source concepts from a supported input file or from a list of strings or SourceConcept objects, processes them in batches through the pipeline, and returns the mapped results as a DataFrame. The resulting mappings are also written to a CSV file in the output directory.

        Args:
            expressions: Optional list of expressions to map. Expects a list of
                strings or SourceConcept objects.
            file_path: Optional path to the source concept file. If not
                provided, the mapper's configured input file is used.
            limit_to: Optional maximum number of rows to process from the
                source file.
            output_dir: Optional output directory override for this mapping
                operation.
            return_as: Return type. Options: "df" or
                "mapped_source_concepts".
            save_to_disk: Whether to save the results to disk.

        Returns:
            A pandas DataFrame containing the mapped source concepts.

        Raises:
            ValueError: If no file path is available or the file type is not
                supported.
        """
        file_path = self.input_file if file_path is None else file_path
        limit_to = self.limit_to if limit_to is None else limit_to

        if expressions is not None:
            assert all(
                isinstance(e, str) or isinstance(e, SourceConcept) for e in expressions
            ), "Expressions must be either strings or SourceConcept objects."

            source_concepts = [
                SourceConcept(source_code_description=e) if isinstance(e, str) else e
                for e in expressions
            ]

        elif file_path is not None:
            if isinstance(file_path, str):
                file_path = Path(file_path)
            if file_path.suffix == ".csv":
                source_concepts = self.map_csv_to_source_concepts(file_path, limit_to)
            else:
                raise ValueError(
                    f"Unsupported file type: {file_path.suffix}. File path provided: {file_path}"
                )
        else:
            raise ValueError("No file path provided")

        assert return_as in ["df", "mapped_source_concepts"], (
            "Parameter return_as must be either 'df' or 'mapped_source_concepts'."
        )

        # map source concepts
        mapped_source_concepts: List[MappedSourceConcept] = []
        translated_source_concepts: List[Translation] = []
        next_allowed_time = time.monotonic()
        for batch_idx in track(
            range(0, len(source_concepts), self.batch_size),
            description=f"Mapping source concepts (batch_size = {self.batch_size})",
        ):
            batch = source_concepts[batch_idx : batch_idx + self.batch_size]

            # rate limit if defined
            if self.rate_limit is not None:
                next_allowed_time = rate_limit(
                    len(batch), next_allowed_time, self.rate_limit
                )

            translated_batch: List[Translation] = batch | self.translator
            selected_source_concepts: SelectorResults = (
                translated_batch | self.retriever | self.reranker | self.selector
            )
            translated_source_concepts.extend(translated_batch)
            mapped_source_concepts.extend(
                MappedSourceConcept.from_selector_results(
                    batch, selected_source_concepts, translated_batch
                )
            )

        mapped_source_concepts_df = None
        if save_to_disk:
            # write mapped source concepts to csv
            mapped_source_concepts_df = pd.DataFrame(
                [
                    mapped_source_concept.to_dict()
                    for mapped_source_concept in mapped_source_concepts
                ]
            )

            output_dir = output_dir if output_dir is not None else self.output_dir

            mapped_source_concepts_df.to_csv(
                self.output_dir / "mapped_source_concepts.csv", index=False
            )

        if return_as == "df":
            if mapped_source_concepts_df is None:
                mapped_source_concepts_df = pd.DataFrame(
                    [
                        mapped_source_concept.to_dict()
                        for mapped_source_concept in mapped_source_concepts
                    ]
                )
            return mapped_source_concepts_df

        elif return_as == "mapped_source_concepts":
            return mapped_source_concepts

    async def amap(
        self,
        file_path: str | Path = None,
        limit_to: int = None,
        return_confidence_scores: bool = True,
        output_dir: str | Path = None,
    ) -> Tuple[pd.DataFrame, List[float]] | pd.DataFrame:
        """Asynchronously map source concepts from a file to standardized concepts.

        This method is the asynchronous counterpart to ``map()``. It processes
        source concepts in batches through the configured pipeline using async
        calls where supported, then writes the mapped results to a CSV file and
        returns them as a DataFrame.

        Args:
            file_path: Optional path to the source concept file.
            limit_to: Optional maximum number of rows to process from the
                source file.
            return_confidence_scores: Whether to include confidence scores in
                the returned output DataFrame.
            output_dir: Optional output directory override for this mapping
                operation.

        Returns:
            A pandas DataFrame containing the mapped source concepts. The
            current annotation allows for a tuple including confidence scores,
            but the present implementation returns only the DataFrame.

        Raises:
            ValueError: If no file path is provided or the file type is not
                supported.
        """
        if file_path is not None:
            if isinstance(file_path, str):
                file_path = Path(file_path)
            if file_path.suffix == ".csv":
                source_concepts = self.map_csv_to_source_concepts(file_path, limit_to)
            else:
                raise ValueError(
                    f"Unsupported file type: {file_path.suffix}. File path provided: {file_path}"
                )
        else:
            raise ValueError("No file path provided")

        # map source concepts
        mapped_source_concepts: List[MappedSourceConcept] = []
        confidence_scores = []
        next_allowed_time = time.monotonic()
        for batch_idx in tqdm(
            range(0, len(source_concepts), self.batch_size),
            desc=f"Mapping source concepts (batch_size = {self.batch_size})",
        ):
            batch = source_concepts[batch_idx : batch_idx + self.batch_size]

            # rate limit if defined
            if self.rate_limit is not None:
                next_allowed_time = rate_limit(
                    len(batch), next_allowed_time, self.rate_limit
                )

            translated_batch = await (batch | self.translator)

            selected_source_concepts: SelectorResults = await (
                translated_batch | self.retriever | self.reranker | self.selector
            )
            mapped_source_concepts.extend(
                MappedSourceConcept.from_selector_results(
                    batch, selected_source_concepts
                )
            )
            confidence_scores.extend(
                [
                    1 - (selected_source_concepts.results[i].distance or 0)
                    if selected_source_concepts.results[i] is not None
                    else None
                    for i in range(len(selected_source_concepts.results))
                ]
            )

        # write mapped source concepts to csv
        mapped_source_concepts_df = pd.DataFrame(
            [
                mapped_source_concept.to_dict()
                for mapped_source_concept in mapped_source_concepts
            ]
        )

        if return_confidence_scores:
            mapped_source_concepts_df["confidence_score"] = confidence_scores

        output_dir = output_dir if output_dir is not None else self.output_dir

        mapped_source_concepts_df.to_csv(
            self.output_dir / "mapped_source_concepts.csv", index=False
        )

        return mapped_source_concepts_df

    def map_csv_to_source_concepts(
        self, file_path: Path, limit_to: int = None
    ) -> List[SourceConcept]:
        """Load source concepts from a CSV file and convert them to model objects.

        This method reads a CSV file, optionally limits the number of rows,
        applies any configured column renaming, validates that the required
        OMOP-style columns are present, drops rows with missing source concept
        descriptions, and converts the remaining rows into ``SourceConcept``
        objects.

        Args:
            file_path: Path to the CSV file containing source concepts.
            limit_to: Optional maximum number of rows to load from the file.

        Returns:
            A list of ``SourceConcept`` objects created from the CSV rows.

        Raises:
            ValueError: If the input file does not contain the required columns
                after optional column remapping.
        """
        df = pd.read_csv(file_path, on_bad_lines="skip")

        if limit_to is not None:
            df = df.iloc[:limit_to]

        # Rename columns
        if self.column_mapping is not None:
            df = df.rename(columns=self.column_mapping)

        # Check if all expected columns are present
        if not self.expected_columns.issubset(set(df.columns)):
            raise ValueError(
                f"This function expects a SOURCE_TO_CONCEPT_MAP table as defined by the official OMOP Common Data Model. It must include the following columns: {self.expected_columns}. Please, either edit the column names or provide the column_mapping argument containing the mapping between the current column names and the expected column names. Got columns: {set(df.columns)}"
            )

        # Check for null values
        if df["source_code_description"].isnull().any():
            console.print(
                f"[yellow]Attention:[/yellow] There are {df['source_code_description'].isnull().sum()} null values in the source_code_description column. Those rows will be dropped."
            )
            console.print(
                f"Dropped rows: {df[df['source_code_description'].isnull()].index.to_list()}"
            )
            df = df.dropna(subset=["source_code_description"])

        df = df.astype(str).fillna("")
        # Convert to SourceConcept objects
        source_concepts: List[SourceConcept] = [
            SourceConcept(**row) for row in df.to_dict("records")
        ]

        return source_concepts

    def __call__(self, expression: str) -> str:
        """Invoke the mapper as a callable object.

        This method delegates to ``map()`` so that mapper instances can be used
        like callable pipeline components.

        Args:
            expression: Input expression or file reference to map.

        Returns:
            The result of calling ``map()`` with the provided input.

        Notes:
            The current type annotation suggests a string input and string
            output, but the underlying ``map()`` method expects file-based
            input and returns a DataFrame.
        """
        return self.map(expression)

    def __repr__(self) -> str:
        """Return the official string representation of the mapper.

        Returns:
            A string representation of the ``TerminologyMapper`` instance.
        """
        return f"{self.__class__.__name__}()"

    def __str__(self) -> str:
        """Return a human-readable string representation of the mapper.

        Returns:
            A string representation of the ``TerminologyMapper`` instance.
        """
        return f"{self.__class__.__name__}()"

__init__(input_file=None, output_dir=Path('output'), translator=None, retriever=None, selector=None, reranker=None, batch_size=None, rate_limit=None, column_mapping=None, limit_to=None, *args, **kwargs)

Initialize the terminology mapper and its pipeline components.

This constructor sets up the terminology-mapping pipeline, defaulting to built-in translator, retriever, selector, and reranker behaviors when custom components are not provided. It also prepares output paths, stores batching and rate-limiting settings, and defines the expected input schema for source concept files.

Parameters:

Name Type Description Default
input_file Optional[str | Path]

Optional path to the source concept file to map.

None
output_dir str | Path

Directory where mapping outputs will be written.

Path('output')
translator Optional[BaseTranslator | str]

Optional translator component used before retrieval. Expects a BaseTranslator or the translator id in the registry.

None
retriever Optional[BaseRetriever | str]

Optional retriever component used to fetch candidate concepts. Expects a BaseRetriever or the retriever id in the registry.

None
selector Optional[BaseSelector | str]

Optional selector component used to choose the final mapped concept. Expects a BaseSelector or the selector id in the registry.

None
reranker Optional[BaseReranker | str]

Optional reranker component used to reorder retrieved candidates before selection. Expects a BaseReranker or the reranker id in the registry.

None
batch_size Optional[int]

Number of source concepts to process per batch.

None
rate_limit Optional[int]

Optional maximum number of items to process per minute.

None
column_mapping Optional[dict]

Optional mapping from input column names to the expected OMOP-style column names.

None
limit_to Optional[int]

Optional maximum number of input rows to process.

None
*args Any

Additional positional arguments reserved for compatibility.

()
**kwargs Any

Additional keyword arguments reserved for compatibility.

{}

Returns:

Type Description
None

None.

Source code in aatm\terminology_mapper.py
def __init__(
    self,
    input_file: Optional[str | Path] = None,
    output_dir: str | Path = Path("output"),
    translator: Optional[BaseTranslator | str] = None,
    retriever: Optional[BaseRetriever | str] = None,
    selector: Optional[BaseSelector | str] = None,
    reranker: Optional[BaseReranker | str] = None,
    batch_size: Optional[int] = None,
    rate_limit: Optional[int] = None,
    column_mapping: Optional[dict] = None,
    limit_to: Optional[int] = None,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the terminology mapper and its pipeline components.

    This constructor sets up the terminology-mapping pipeline, defaulting to
    built-in translator, retriever, selector, and reranker behaviors when
    custom components are not provided. It also prepares output paths,
    stores batching and rate-limiting settings, and defines the expected
    input schema for source concept files.

    Args:
        input_file: Optional path to the source concept file to map.
        output_dir: Directory where mapping outputs will be written.
        translator: Optional translator component used before retrieval. Expects a BaseTranslator or the translator id in the registry.
        retriever: Optional retriever component used to fetch candidate
            concepts. Expects a BaseRetriever or the retriever id in the registry.
        selector: Optional selector component used to choose the final
            mapped concept. Expects a BaseSelector or the selector id in the registry.
        reranker: Optional reranker component used to reorder retrieved
            candidates before selection. Expects a BaseReranker or the reranker id in the registry.
        batch_size: Number of source concepts to process per batch.
        rate_limit: Optional maximum number of items to process per minute.
        column_mapping: Optional mapping from input column names to the
            expected OMOP-style column names.
        limit_to: Optional maximum number of input rows to process.
        *args: Additional positional arguments reserved for compatibility.
        **kwargs: Additional keyword arguments reserved for compatibility.

    Returns:
        None.
    """
    # Define translator
    if translator is None:
        self.translator = EmptyTranslator()

    elif isinstance(translator, str):
        self.translator = load_translator(translator)

    elif isinstance(BaseTranslator, translator):
        self.translator = translator

    else:
        raise TypeError(
            f"Translator must be a BaseTranslator or str representing the translator name.  Given type: {type(translator)}."
        )

    # Define retriever
    if retriever is None:
        client = chromadb.PersistentClient()
        self.retriever = ChromaDBRetriever(
            client=client,
            collection_name="expressions",
            embedding_function=GoogleEmbeddingFunction(
                model="gemini-embedding-001"
            ),
        )

    elif isinstance(retriever, str):
        self.retriever = load_retriever(retriever)

    elif isinstance(BaseRetriever, retriever):
        self.retriever = retriever

    else:
        raise TypeError(
            f"Retriever must be a BaseRetriever or str representing the retriever name.  Given type: {type(retriever)}."
        )

    # Define selector
    if selector is None:
        self.selector = FirstResultSelector()

    elif isinstance(selector, str):
        self.selector = load_selector(selector)

    elif isinstance(BaseSelector, selector):
        self.selector = selector

    else:
        raise TypeError(
            f"Selector must be a BaseSelector or str representing the selector name.  Given type: {type(selector)}."
        )

    # Define reranker
    if reranker is None:
        self.reranker = PipelineBaseClass()

    elif isinstance(reranker, str):
        self.reranker = load_reranker(reranker)

    elif isinstance(BaseReranker, reranker):
        self.reranker = reranker

    else:
        raise TypeError(
            f"Reranker must be a BaseReranker or str representing the reranker name.  Given type: {type(reranker)}."
        )

    # Other attributes
    if isinstance(output_dir, str):
        output_dir = Path(output_dir)

    self.input_file: Optional[Path] = Path(input_file) if input_file else None
    self.output_dir: Path = output_dir
    self.output_dir.mkdir(parents=True, exist_ok=True)
    self.batch_size: int = batch_size if batch_size else 100
    self.rate_limit: Optional[int] = rate_limit
    self.expected_columns: set[str] = set(
        [
            "source_code",
            "source_concept_id",
            "source_vocabulary_id",
            "source_code_description",
            "valid_start_date",
            "valid_end_date",
            "invalid_reason",
        ]
    )
    self.column_mapping = column_mapping
    self.limit_to = limit_to

from_task_config(task_config) classmethod

Create a terminology mapper from a task configuration object.

This factory method resolves the configured translator, retriever, selector, and reranker from their registries and initializes a TerminologyMapper with the remaining task parameters.

Parameters:

Name Type Description Default
task_config TerminologyMappingTask

Structured task configuration describing the mapping pipeline and runtime settings.

required

Returns:

Type Description
TerminologyMapper

A configured TerminologyMapper instance.

Source code in aatm\terminology_mapper.py
@classmethod
def from_task_config(
    cls, task_config: TerminologyMappingTask
) -> "TerminologyMapper":
    """Create a terminology mapper from a task configuration object.

    This factory method resolves the configured translator, retriever,
    selector, and reranker from their registries and initializes a
    ``TerminologyMapper`` with the remaining task parameters.

    Args:
        task_config: Structured task configuration describing the mapping
            pipeline and runtime settings.

    Returns:
        A configured ``TerminologyMapper`` instance.
    """
    translator = (
        load_translator(task_config.translator_id)
        if task_config.translator_id
        else None
    )

    retriever = (
        load_retriever(task_config.retriever_id)
        if task_config.retriever_id
        else None
    )

    selector = (
        load_selector(task_config.selector_id) if task_config.selector_id else None
    )

    reranker = (
        load_reranker(task_config.reranker_id) if task_config.reranker_id else None
    )

    return cls(
        translator=translator,
        retriever=retriever,
        selector=selector,
        reranker=reranker,
        batch_size=task_config.batch_size,
        rate_limit=task_config.rate_limit,
        input_file=task_config.input_file,
        output_dir=task_config.output_dir,
        column_mapping=task_config.column_mapping,
        limit_to=task_config.limit_to,
    )

map(expressions=None, file_path=None, limit_to=None, output_dir=None, return_as='df', save_to_disk=True)

Map source concepts from a file to standardized concepts.

This method loads source concepts from a supported input file or from a list of strings or SourceConcept objects, processes them in batches through the pipeline, and returns the mapped results as a DataFrame. The resulting mappings are also written to a CSV file in the output directory.

Parameters:

Name Type Description Default
expressions Optional[List[str | SourceConcept]]

Optional list of expressions to map. Expects a list of strings or SourceConcept objects.

None
file_path str | Path

Optional path to the source concept file. If not provided, the mapper's configured input file is used.

None
limit_to int

Optional maximum number of rows to process from the source file.

None
output_dir str | Path

Optional output directory override for this mapping operation.

None
return_as Literal['df', 'mapped_source_concepts']

Return type. Options: "df" or "mapped_source_concepts".

'df'
save_to_disk bool

Whether to save the results to disk.

True

Returns:

Type Description
DataFrame

A pandas DataFrame containing the mapped source concepts.

Raises:

Type Description
ValueError

If no file path is available or the file type is not supported.

Source code in aatm\terminology_mapper.py
def map(
    self,
    expressions: Optional[List[str | SourceConcept]] = None,
    file_path: str | Path = None,
    limit_to: int = None,
    output_dir: str | Path = None,
    return_as: Literal["df", "mapped_source_concepts"] = "df",
    save_to_disk: bool = True,
) -> pd.DataFrame:
    """Map source concepts from a file to standardized concepts.

    This method loads source concepts from a supported input file or from a list of strings or SourceConcept objects, processes them in batches through the pipeline, and returns the mapped results as a DataFrame. The resulting mappings are also written to a CSV file in the output directory.

    Args:
        expressions: Optional list of expressions to map. Expects a list of
            strings or SourceConcept objects.
        file_path: Optional path to the source concept file. If not
            provided, the mapper's configured input file is used.
        limit_to: Optional maximum number of rows to process from the
            source file.
        output_dir: Optional output directory override for this mapping
            operation.
        return_as: Return type. Options: "df" or
            "mapped_source_concepts".
        save_to_disk: Whether to save the results to disk.

    Returns:
        A pandas DataFrame containing the mapped source concepts.

    Raises:
        ValueError: If no file path is available or the file type is not
            supported.
    """
    file_path = self.input_file if file_path is None else file_path
    limit_to = self.limit_to if limit_to is None else limit_to

    if expressions is not None:
        assert all(
            isinstance(e, str) or isinstance(e, SourceConcept) for e in expressions
        ), "Expressions must be either strings or SourceConcept objects."

        source_concepts = [
            SourceConcept(source_code_description=e) if isinstance(e, str) else e
            for e in expressions
        ]

    elif file_path is not None:
        if isinstance(file_path, str):
            file_path = Path(file_path)
        if file_path.suffix == ".csv":
            source_concepts = self.map_csv_to_source_concepts(file_path, limit_to)
        else:
            raise ValueError(
                f"Unsupported file type: {file_path.suffix}. File path provided: {file_path}"
            )
    else:
        raise ValueError("No file path provided")

    assert return_as in ["df", "mapped_source_concepts"], (
        "Parameter return_as must be either 'df' or 'mapped_source_concepts'."
    )

    # map source concepts
    mapped_source_concepts: List[MappedSourceConcept] = []
    translated_source_concepts: List[Translation] = []
    next_allowed_time = time.monotonic()
    for batch_idx in track(
        range(0, len(source_concepts), self.batch_size),
        description=f"Mapping source concepts (batch_size = {self.batch_size})",
    ):
        batch = source_concepts[batch_idx : batch_idx + self.batch_size]

        # rate limit if defined
        if self.rate_limit is not None:
            next_allowed_time = rate_limit(
                len(batch), next_allowed_time, self.rate_limit
            )

        translated_batch: List[Translation] = batch | self.translator
        selected_source_concepts: SelectorResults = (
            translated_batch | self.retriever | self.reranker | self.selector
        )
        translated_source_concepts.extend(translated_batch)
        mapped_source_concepts.extend(
            MappedSourceConcept.from_selector_results(
                batch, selected_source_concepts, translated_batch
            )
        )

    mapped_source_concepts_df = None
    if save_to_disk:
        # write mapped source concepts to csv
        mapped_source_concepts_df = pd.DataFrame(
            [
                mapped_source_concept.to_dict()
                for mapped_source_concept in mapped_source_concepts
            ]
        )

        output_dir = output_dir if output_dir is not None else self.output_dir

        mapped_source_concepts_df.to_csv(
            self.output_dir / "mapped_source_concepts.csv", index=False
        )

    if return_as == "df":
        if mapped_source_concepts_df is None:
            mapped_source_concepts_df = pd.DataFrame(
                [
                    mapped_source_concept.to_dict()
                    for mapped_source_concept in mapped_source_concepts
                ]
            )
        return mapped_source_concepts_df

    elif return_as == "mapped_source_concepts":
        return mapped_source_concepts

amap(file_path=None, limit_to=None, return_confidence_scores=True, output_dir=None) async

Asynchronously map source concepts from a file to standardized concepts.

This method is the asynchronous counterpart to map(). It processes source concepts in batches through the configured pipeline using async calls where supported, then writes the mapped results to a CSV file and returns them as a DataFrame.

Parameters:

Name Type Description Default
file_path str | Path

Optional path to the source concept file.

None
limit_to int

Optional maximum number of rows to process from the source file.

None
return_confidence_scores bool

Whether to include confidence scores in the returned output DataFrame.

True
output_dir str | Path

Optional output directory override for this mapping operation.

None

Returns:

Type Description
Tuple[DataFrame, List[float]] | DataFrame

A pandas DataFrame containing the mapped source concepts. The

Tuple[DataFrame, List[float]] | DataFrame

current annotation allows for a tuple including confidence scores,

Tuple[DataFrame, List[float]] | DataFrame

but the present implementation returns only the DataFrame.

Raises:

Type Description
ValueError

If no file path is provided or the file type is not supported.

Source code in aatm\terminology_mapper.py
async def amap(
    self,
    file_path: str | Path = None,
    limit_to: int = None,
    return_confidence_scores: bool = True,
    output_dir: str | Path = None,
) -> Tuple[pd.DataFrame, List[float]] | pd.DataFrame:
    """Asynchronously map source concepts from a file to standardized concepts.

    This method is the asynchronous counterpart to ``map()``. It processes
    source concepts in batches through the configured pipeline using async
    calls where supported, then writes the mapped results to a CSV file and
    returns them as a DataFrame.

    Args:
        file_path: Optional path to the source concept file.
        limit_to: Optional maximum number of rows to process from the
            source file.
        return_confidence_scores: Whether to include confidence scores in
            the returned output DataFrame.
        output_dir: Optional output directory override for this mapping
            operation.

    Returns:
        A pandas DataFrame containing the mapped source concepts. The
        current annotation allows for a tuple including confidence scores,
        but the present implementation returns only the DataFrame.

    Raises:
        ValueError: If no file path is provided or the file type is not
            supported.
    """
    if file_path is not None:
        if isinstance(file_path, str):
            file_path = Path(file_path)
        if file_path.suffix == ".csv":
            source_concepts = self.map_csv_to_source_concepts(file_path, limit_to)
        else:
            raise ValueError(
                f"Unsupported file type: {file_path.suffix}. File path provided: {file_path}"
            )
    else:
        raise ValueError("No file path provided")

    # map source concepts
    mapped_source_concepts: List[MappedSourceConcept] = []
    confidence_scores = []
    next_allowed_time = time.monotonic()
    for batch_idx in tqdm(
        range(0, len(source_concepts), self.batch_size),
        desc=f"Mapping source concepts (batch_size = {self.batch_size})",
    ):
        batch = source_concepts[batch_idx : batch_idx + self.batch_size]

        # rate limit if defined
        if self.rate_limit is not None:
            next_allowed_time = rate_limit(
                len(batch), next_allowed_time, self.rate_limit
            )

        translated_batch = await (batch | self.translator)

        selected_source_concepts: SelectorResults = await (
            translated_batch | self.retriever | self.reranker | self.selector
        )
        mapped_source_concepts.extend(
            MappedSourceConcept.from_selector_results(
                batch, selected_source_concepts
            )
        )
        confidence_scores.extend(
            [
                1 - (selected_source_concepts.results[i].distance or 0)
                if selected_source_concepts.results[i] is not None
                else None
                for i in range(len(selected_source_concepts.results))
            ]
        )

    # write mapped source concepts to csv
    mapped_source_concepts_df = pd.DataFrame(
        [
            mapped_source_concept.to_dict()
            for mapped_source_concept in mapped_source_concepts
        ]
    )

    if return_confidence_scores:
        mapped_source_concepts_df["confidence_score"] = confidence_scores

    output_dir = output_dir if output_dir is not None else self.output_dir

    mapped_source_concepts_df.to_csv(
        self.output_dir / "mapped_source_concepts.csv", index=False
    )

    return mapped_source_concepts_df

map_csv_to_source_concepts(file_path, limit_to=None)

Load source concepts from a CSV file and convert them to model objects.

This method reads a CSV file, optionally limits the number of rows, applies any configured column renaming, validates that the required OMOP-style columns are present, drops rows with missing source concept descriptions, and converts the remaining rows into SourceConcept objects.

Parameters:

Name Type Description Default
file_path Path

Path to the CSV file containing source concepts.

required
limit_to int

Optional maximum number of rows to load from the file.

None

Returns:

Type Description
List[SourceConcept]

A list of SourceConcept objects created from the CSV rows.

Raises:

Type Description
ValueError

If the input file does not contain the required columns after optional column remapping.

Source code in aatm\terminology_mapper.py
def map_csv_to_source_concepts(
    self, file_path: Path, limit_to: int = None
) -> List[SourceConcept]:
    """Load source concepts from a CSV file and convert them to model objects.

    This method reads a CSV file, optionally limits the number of rows,
    applies any configured column renaming, validates that the required
    OMOP-style columns are present, drops rows with missing source concept
    descriptions, and converts the remaining rows into ``SourceConcept``
    objects.

    Args:
        file_path: Path to the CSV file containing source concepts.
        limit_to: Optional maximum number of rows to load from the file.

    Returns:
        A list of ``SourceConcept`` objects created from the CSV rows.

    Raises:
        ValueError: If the input file does not contain the required columns
            after optional column remapping.
    """
    df = pd.read_csv(file_path, on_bad_lines="skip")

    if limit_to is not None:
        df = df.iloc[:limit_to]

    # Rename columns
    if self.column_mapping is not None:
        df = df.rename(columns=self.column_mapping)

    # Check if all expected columns are present
    if not self.expected_columns.issubset(set(df.columns)):
        raise ValueError(
            f"This function expects a SOURCE_TO_CONCEPT_MAP table as defined by the official OMOP Common Data Model. It must include the following columns: {self.expected_columns}. Please, either edit the column names or provide the column_mapping argument containing the mapping between the current column names and the expected column names. Got columns: {set(df.columns)}"
        )

    # Check for null values
    if df["source_code_description"].isnull().any():
        console.print(
            f"[yellow]Attention:[/yellow] There are {df['source_code_description'].isnull().sum()} null values in the source_code_description column. Those rows will be dropped."
        )
        console.print(
            f"Dropped rows: {df[df['source_code_description'].isnull()].index.to_list()}"
        )
        df = df.dropna(subset=["source_code_description"])

    df = df.astype(str).fillna("")
    # Convert to SourceConcept objects
    source_concepts: List[SourceConcept] = [
        SourceConcept(**row) for row in df.to_dict("records")
    ]

    return source_concepts

__call__(expression)

Invoke the mapper as a callable object.

This method delegates to map() so that mapper instances can be used like callable pipeline components.

Parameters:

Name Type Description Default
expression str

Input expression or file reference to map.

required

Returns:

Type Description
str

The result of calling map() with the provided input.

Notes

The current type annotation suggests a string input and string output, but the underlying map() method expects file-based input and returns a DataFrame.

Source code in aatm\terminology_mapper.py
def __call__(self, expression: str) -> str:
    """Invoke the mapper as a callable object.

    This method delegates to ``map()`` so that mapper instances can be used
    like callable pipeline components.

    Args:
        expression: Input expression or file reference to map.

    Returns:
        The result of calling ``map()`` with the provided input.

    Notes:
        The current type annotation suggests a string input and string
        output, but the underlying ``map()`` method expects file-based
        input and returns a DataFrame.
    """
    return self.map(expression)

__repr__()

Return the official string representation of the mapper.

Returns:

Type Description
str

A string representation of the TerminologyMapper instance.

Source code in aatm\terminology_mapper.py
def __repr__(self) -> str:
    """Return the official string representation of the mapper.

    Returns:
        A string representation of the ``TerminologyMapper`` instance.
    """
    return f"{self.__class__.__name__}()"

__str__()

Return a human-readable string representation of the mapper.

Returns:

Type Description
str

A string representation of the TerminologyMapper instance.

Source code in aatm\terminology_mapper.py
def __str__(self) -> str:
    """Return a human-readable string representation of the mapper.

    Returns:
        A string representation of the ``TerminologyMapper`` instance.
    """
    return f"{self.__class__.__name__}()"

rate_limit(n_docs, next_allowed_time, rate_limit)

Apply a document-based rate limit and return the next allowed time.

This helper delays execution when necessary to ensure that processing does not exceed the configured throughput in documents per minute. It uses a monotonic clock to avoid issues caused by system clock adjustments.

Parameters:

Name Type Description Default
n_docs int

Number of documents in the current batch.

required
next_allowed_time float

Monotonic timestamp representing the next permitted processing time.

required
rate_limit int

Maximum number of documents allowed per minute.

required

Returns:

Type Description
None

The updated monotonic timestamp indicating when the next batch may be processed.

Notes

The return annotation in the current implementation is None, but the function actually returns the updated next allowed time.

Source code in aatm\terminology_mapper.py
def rate_limit(n_docs: int, next_allowed_time: float, rate_limit: int) -> None:
    """Apply a document-based rate limit and return the next allowed time.

    This helper delays execution when necessary to ensure that processing does
    not exceed the configured throughput in documents per minute. It uses a
    monotonic clock to avoid issues caused by system clock adjustments.

    Args:
        n_docs: Number of documents in the current batch.
        next_allowed_time: Monotonic timestamp representing the next permitted
            processing time.
        rate_limit: Maximum number of documents allowed per minute.

    Returns:
        The updated monotonic timestamp indicating when the next batch may be
            processed.

    Notes:
        The return annotation in the current implementation is ``None``, but
        the function actually returns the updated next allowed time.
    """
    now = time.monotonic()
    if now < next_allowed_time:
        time.sleep(next_allowed_time - now)
        now = time.monotonic()
    # Reserve time for this batch
    next_allowed_time = max(next_allowed_time, now) + n_docs * 60 / rate_limit
    return next_allowed_time