Skip to content

API Reference

init module for winipedia_utils.

core

src package.

This package contains the source code for the winiutils library.

data

Data processing utilities package.

This package provides utilities for data manipulation and processing:

Modules:

Name Description
dataframe

Polars DataFrame cleaning pipeline utilities.

structures

Data structure utilities including dicts and text processing.

dataframe

DataFrame processing utilities package.

This package provides utilities for working with Polars DataFrames:

Modules:

Name Description
cleaning

Abstract base class for building DataFrame cleaning pipelines.

cleaning

DataFrame cleaning pipeline utilities using Polars.

This module provides an abstract base class for building extensible DataFrame cleaning pipelines. The CleaningDF class implements an 8-step cleaning pipeline that can be customized by implementing abstract methods in child classes.

The cleaning pipeline executes the following operations in order
  1. Rename columns to standardized names
  2. Drop columns not in the schema
  3. Fill null values with specified defaults
  4. Convert columns to correct data types
  5. Drop rows where specified column subsets are entirely null
  6. Handle duplicates by aggregating and removing
  7. Sort the DataFrame by specified columns
  8. Validate data quality (types, nulls, NaN values)
Example

import polars as pl from winiutils.core.data.dataframe.cleaning import CleaningDF

class UserCleaner(CleaningDF): ... USER_ID = "user_id" ... EMAIL = "email" ... ... @classmethod ... def get_rename_map(cls): ... return {cls.USER_ID: "UserId", cls.EMAIL: "Email"} ... ... # ... implement other abstract methods

CleaningDF

Bases: ABCLoggingMixin

Abstract base class for cleaning and standardizing DataFrames using Polars.

This class provides a comprehensive pipeline for importing, cleaning, and standardizing data from various sources before loading into databases or other systems. It enforces data quality standards through a series of configurable cleaning operations.

The cleaning pipeline executes in the following order
  1. Rename columns according to a standardized naming scheme
  2. Drop columns not in the schema
  3. Fill null values with specified defaults
  4. Convert columns to correct data types and apply custom transformations
  5. Drop rows where specified column subsets are entirely null
  6. Handle duplicates by aggregating values and removing duplicates
  7. Sort the DataFrame by specified columns
  8. Validate data quality (correct dtypes, no nulls in required columns, no NaN values)

Child classes must implement abstract methods to define the cleaning configuration: - get_rename_map(): Define column name mappings - get_col_dtype_map(): Define expected data types for each column - get_drop_null_subsets(): Define which column subsets trigger row deletion - get_fill_null_map(): Define null value fill strategies - get_sort_cols(): Define sort order - get_unique_subsets(): Define duplicate detection criteria - get_no_null_cols(): Define columns that cannot contain nulls - get_col_converter_map(): Define custom column transformations - get_add_on_duplicate_cols(): Define columns to aggregate when duplicates are found - get_col_precision_map(): Define rounding precision for float columns

Attributes:

Name Type Description
df

The cleaned Polars DataFrame after the pipeline has executed.

Note
  • Define column names as class-level string constants for reusability
  • NaN values are automatically converted to null for consistency
  • The class inherits automatic method logging from ABCLoggingMixin
Example

class UserCleaner(CleaningDF): ... USER_ID = "user_id" ... EMAIL = "email" ... SCORE = "score" ... ... @classmethod ... def get_col_dtype_map(cls): ... return {cls.USER_ID: pl.Int64, cls.EMAIL: pl.Utf8}

Source code in src/winiutils/core/data/dataframe/cleaning.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
class CleaningDF(ABCLoggingMixin):
    """Abstract base class for cleaning and standardizing DataFrames using Polars.

    This class provides a comprehensive pipeline for importing, cleaning, and
    standardizing data from various sources before loading into databases or
    other systems. It enforces data quality standards through a series of
    configurable cleaning operations.

    The cleaning pipeline executes in the following order:
        1. Rename columns according to a standardized naming scheme
        2. Drop columns not in the schema
        3. Fill null values with specified defaults
        4. Convert columns to correct data types and apply custom transformations
        5. Drop rows where specified column subsets are entirely null
        6. Handle duplicates by aggregating values and removing duplicates
        7. Sort the DataFrame by specified columns
        8. Validate data quality (correct dtypes, no nulls in required columns,
           no NaN values)

    Child classes must implement abstract methods to define the cleaning
    configuration:
        - ``get_rename_map()``: Define column name mappings
        - ``get_col_dtype_map()``: Define expected data types for each column
        - ``get_drop_null_subsets()``: Define which column subsets trigger row
          deletion
        - ``get_fill_null_map()``: Define null value fill strategies
        - ``get_sort_cols()``: Define sort order
        - ``get_unique_subsets()``: Define duplicate detection criteria
        - ``get_no_null_cols()``: Define columns that cannot contain nulls
        - ``get_col_converter_map()``: Define custom column transformations
        - ``get_add_on_duplicate_cols()``: Define columns to aggregate when
          duplicates are found
        - ``get_col_precision_map()``: Define rounding precision for float columns

    Attributes:
        df: The cleaned Polars DataFrame after the pipeline has executed.

    Note:
        - Define column names as class-level string constants for reusability
        - NaN values are automatically converted to null for consistency
        - The class inherits automatic method logging from ``ABCLoggingMixin``

    Example:
        >>> class UserCleaner(CleaningDF):
        ...     USER_ID = "user_id"
        ...     EMAIL = "email"
        ...     SCORE = "score"
        ...
        ...     @classmethod
        ...     def get_col_dtype_map(cls):
        ...         return {cls.USER_ID: pl.Int64, cls.EMAIL: pl.Utf8}
    """

    @classmethod
    @abstractmethod
    def get_rename_map(cls) -> dict[str, str]:
        """Define column name mappings for standardization.

        This abstract method must be implemented in child classes to specify how
        raw input column names should be renamed to standardized names. Renaming
        is the first operation in the cleaning pipeline, executed before all other
        cleaning operations.

        The mapping format follows the CleaningDF convention of mapping
        standardized names to raw input names. The reverse mapping is applied
        to the DataFrame during cleaning.

        Returns:
            Dictionary mapping standardized column names (keys) to raw input
            column names (values).

        Example:
            >>> @classmethod
            ... def get_rename_map(cls):
            ...     return {
            ...         "user_id": "UserId",
            ...         "email": "Email_Address",
            ...         "created_at": "CreatedDate",
            ...     }
        """

    @classmethod
    @abstractmethod
    def get_col_dtype_map(cls) -> dict[str, type[pl.DataType]]:
        """Define the expected data type for each column in the cleaned DataFrame.

        This abstract method must be implemented in child classes to specify the
        target data types for all columns. The DataFrame will be validated against
        this schema after cleaning, and a TypeError will be raised if any column
        has an incorrect type.

        Returns:
            Dictionary mapping standardized column names to their expected
            Polars data types.

        Example:
            >>> @classmethod
            ... def get_col_dtype_map(cls):
            ...     return {
            ...         "user_id": pl.Int64,
            ...         "email": pl.Utf8,
            ...         "created_at": pl.Date,
            ...         "score": pl.Float64,
            ...     }
        """

    @classmethod
    @abstractmethod
    def get_drop_null_subsets(cls) -> tuple[tuple[str, ...], ...]:
        """Define column subsets for dropping rows with all-null values.

        This abstract method specifies which column subsets should trigger row
        deletion. A row is dropped if ALL columns in a subset are null. Multiple
        subsets can be defined to apply different null-dropping rules. If no
        subsets are defined, rows where all columns are null will be dropped.

        Returns:
            Tuple of column name tuples, where each inner tuple represents one
            subset. A row is dropped if all columns in any subset are null.

        Example:
            >>> @classmethod
            ... def get_drop_null_subsets(cls):
            ...     return (
            ...         ("email", "phone"),  # Drop if both are null
            ...         ("address_line1",),  # Drop if null
            ...     )
        """

    @classmethod
    @abstractmethod
    def get_fill_null_map(cls) -> dict[str, Any]:
        """Define null value fill strategies for each column.

        This abstract method specifies default values to fill null entries in
        each column. This is applied early in the cleaning pipeline after
        column renaming.

        Returns:
            Dictionary mapping column names to their fill values. The fill
            value can be any type appropriate for the column's data type.

        Example:
            >>> @classmethod
            ... def get_fill_null_map(cls):
            ...     return {
            ...         "email": "",
            ...         "phone": "",
            ...         "score": 0,
            ...         "status": "unknown",
            ...     }
        """

    @classmethod
    @abstractmethod
    def get_sort_cols(cls) -> tuple[tuple[str, bool], ...]:
        """Define the sort order for the cleaned DataFrame.

        This abstract method specifies which columns to sort by and in what
        order (ascending or descending). Sorting is applied near the end of
        the cleaning pipeline, after all data transformations are complete.

        Returns:
            Tuple of (column_name, is_descending) tuples. Each tuple specifies
            a column and sort direction. Columns are sorted in the order they
            appear. True = descending, False = ascending.

        Example:
            >>> @classmethod
            ... def get_sort_cols(cls):
            ...     return (
            ...         ("created_at", True),   # Descending
            ...         ("user_id", False),     # Ascending
            ...     )
        """

    @classmethod
    @abstractmethod
    def get_unique_subsets(cls) -> tuple[tuple[str, ...], ...]:
        """Define column subsets for duplicate detection and removal.

        This abstract method specifies which column combinations define
        uniqueness. Rows are considered duplicates if they have identical
        values in all columns of a subset. When duplicates are found, values
        in columns specified by ``get_add_on_duplicate_cols()`` are summed,
        and the first row is kept.

        Returns:
            Tuple of column name tuples, where each inner tuple represents
            one uniqueness constraint. Duplicates are detected and handled
            for each subset independently.

        Example:
            >>> @classmethod
            ... def get_unique_subsets(cls):
            ...     return (
            ...         ("user_id", "date"),      # Unique by user_id and date
            ...         ("transaction_id",),      # Unique by transaction_id
            ...     )
        """

    @classmethod
    @abstractmethod
    def get_no_null_cols(cls) -> tuple[str, ...]:
        """Define columns that must not contain null values.

        This abstract method specifies which columns are required to have
        non-null values. A ValueError is raised during the final validation
        step if any of these columns contain null values.

        Returns:
            Tuple of column names that must not contain null values.

        Example:
            >>> @classmethod
            ... def get_no_null_cols(cls):
            ...     return ("user_id", "email", "created_at")
        """

    @classmethod
    @abstractmethod
    def get_col_converter_map(
        cls,
    ) -> dict[str, Callable[[pl.Series], pl.Series]]:
        """Define custom conversion functions for columns.

        This abstract method specifies custom transformations to apply to
        columns after standard conversions (string stripping, float rounding).
        Each function receives a Polars Series and returns a transformed
        Series. Use ``skip_col_converter`` as a placeholder for columns that
        don't need custom conversion.

        Returns:
            Dictionary mapping column names to their conversion functions.
            Each function takes a Series and returns a transformed Series.

        Example:
            >>> @classmethod
            ... def get_col_converter_map(cls):
            ...     return {
            ...         "email": lambda s: s.str.to_lowercase(),
            ...         "phone": cls.parse_phone_number,
            ...         "created_at": cls.skip_col_converter,
            ...     }
        """

    @classmethod
    @abstractmethod
    def get_add_on_duplicate_cols(cls) -> tuple[str, ...]:
        """Define columns to aggregate when duplicate rows are found.

        This abstract method specifies which columns should have their values
        summed when duplicate rows are detected (based on
        ``get_unique_subsets()``). The summed values are kept in the first row,
        and duplicate rows are removed.

        Returns:
            Tuple of column names whose values should be summed when duplicates
            are found.

        Example:
            >>> @classmethod
            ... def get_add_on_duplicate_cols(cls):
            ...     return ("quantity", "revenue", "impressions")
        """

    @classmethod
    @abstractmethod
    def get_col_precision_map(cls) -> dict[str, int]:
        """Define rounding precision for float columns.

        This abstract method specifies the number of decimal places to round
        float columns to. Rounding is applied during the standard conversion
        phase and uses Kahan summation to compensate for floating-point
        rounding errors.

        Returns:
            Dictionary mapping float column names to their precision
            (number of decimal places).

        Example:
            >>> @classmethod
            ... def get_col_precision_map(cls):
            ...     return {
            ...         "price": 2,
            ...         "percentage": 4,
            ...         "score": 1,
            ...     }
        """

    def __init__(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        """Initialize the CleaningDF and execute the cleaning pipeline.

        Creates a Polars DataFrame with NaN values automatically converted to
        null, then immediately executes the full cleaning pipeline. The schema
        is enforced from ``get_col_dtype_map()``.

        Args:
            *args: Positional arguments passed to ``pl.DataFrame`` constructor.
            **kwargs: Keyword arguments passed to ``pl.DataFrame`` constructor.

        Note:
            The following kwargs are automatically set and will override any
            user-provided values:
                - ``nan_to_null``: Always set to True
                - ``schema``: Set from ``get_col_dtype_map()``
                - ``data``: Replaced with renamed and filtered data
        """
        # create a temp df for standardization and accepting all ploars arg and kwargs
        temp_df = pl.DataFrame(*args, **kwargs)
        temp_df = self.rename_cols(temp_df)
        temp_df = self.drop_cols(temp_df)

        # enforce standard kwargs and create the final df
        kwargs["data"] = temp_df.to_dict(as_series=True)
        kwargs["nan_to_null"] = True
        kwargs["schema"] = self.get_col_dtype_map()
        self.df = pl.DataFrame(**kwargs)
        self.clean()

    @classmethod
    def get_col_names(cls) -> tuple[str, ...]:
        """Get the standardized column names from the dtype map.

        Returns:
            Tuple of standardized column names in the order they appear
            in ``get_col_dtype_map()``.
        """
        return tuple(cls.get_col_dtype_map().keys())

    def clean(self) -> None:
        """Execute the complete data cleaning pipeline.

        Applies all cleaning operations in the following order:
            1. Fill null values with defaults
            2. Convert columns to correct types and apply transformations
            3. Drop rows with all-null column subsets
            4. Handle duplicates by aggregating and removing
            5. Sort the DataFrame
            6. Validate data quality

        Note:
            Renaming and dropping columns are done during ``__init__`` before
            this method is called. This method is automatically called during
            initialization.
        """
        self.fill_nulls()
        self.convert_cols()
        self.drop_null_subsets()
        self.handle_duplicates()
        self.sort_cols()
        self.check()

    @classmethod
    def raise_on_missing_cols(
        cls,
        map_func: Callable[..., dict[str, Any]],
    ) -> None:
        """Validate that all required columns are present in a configuration map.

        Checks that the columns returned by ``map_func`` contain all columns
        defined in the schema. Raises KeyError if any required columns are
        missing from the map.

        Args:
            map_func: A callable that returns a dict with column names as keys.

        Raises:
            KeyError: If any required columns are missing from the map.
        """
        col_names = cls.get_col_names()
        missing_cols = set(col_names) - set(map_func().keys())
        if missing_cols:
            msg = f"Missing columns in {map_func}: {missing_cols}"
            raise KeyError(msg)

    def rename_cols(self, temp_df: pl.DataFrame) -> pl.DataFrame:
        """Rename columns from raw names to standardized names.

        Applies the reverse of ``get_rename_map()`` to rename columns from
        their raw input names to standardized names.

        Args:
            temp_df: The DataFrame with raw column names to rename.

        Returns:
            DataFrame with columns renamed to standardized names.

        Raises:
            KeyError: If any required columns are missing from the rename map.
        """
        self.raise_on_missing_cols(self.get_rename_map)
        return temp_df.rename(reverse_dict(self.get_rename_map()))

    def drop_cols(self, temp_df: pl.DataFrame) -> pl.DataFrame:
        """Drop columns not defined in the schema.

        Selects only the columns defined in ``get_col_names()``, removing any
        extra columns that may have been in the input data.

        Args:
            temp_df: The DataFrame to filter columns from.

        Returns:
            DataFrame containing only the columns defined in the schema.
        """
        return temp_df.select(self.get_col_names())

    def fill_nulls(self) -> None:
        """Fill null values with defaults from the fill null map.

        Replaces null values in each column with the corresponding fill value
        from ``get_fill_null_map()``.

        Raises:
            KeyError: If any columns are missing from the fill null map.
        """
        self.raise_on_missing_cols(self.get_fill_null_map)
        self.df = self.df.with_columns(
            [
                pl.col(col_name).fill_null(fill_value)
                for col_name, fill_value in self.get_fill_null_map().items()
            ]
        )

    def convert_cols(self) -> None:
        """Apply standard and custom column conversions.

        Orchestrates both standard conversions (string stripping, float
        rounding) and custom conversions defined in ``get_col_converter_map()``.

        Raises:
            KeyError: If any columns are missing from the converter map.
        """
        self.raise_on_missing_cols(self.get_col_converter_map)
        self.standard_convert_cols()
        self.custom_convert_cols()

    def standard_convert_cols(self) -> None:
        """Apply standard conversions based on data type.

        Automatically applies the following transformations:
            - ``pl.Utf8`` columns: Strip leading/trailing whitespace
            - ``pl.Float64`` columns: Round to precision using Kahan summation
        """
        for col_name, dtype in self.get_col_dtype_map().items():
            if dtype == pl.Utf8:
                converter = self.strip_col
            elif dtype == pl.Float64:
                converter = self.round_col
            else:
                continue
            self.df = self.df.with_columns(
                pl.col(col_name).map_batches(converter, return_dtype=dtype)
            )

    def custom_convert_cols(self) -> None:
        """Apply custom conversion functions to columns.

        Applies custom transformations from ``get_col_converter_map()`` to each
        column. Columns marked with ``skip_col_converter`` are skipped.
        """
        self.df = self.df.with_columns(
            [
                pl.col(col_name).map_batches(
                    converter, return_dtype=self.get_col_dtype_map()[col_name]
                )
                for col_name, converter in self.get_col_converter_map().items()
                if converter.__name__ != self.skip_col_converter.__name__  # ty:ignore[unresolved-attribute]
            ]
        )

    @classmethod
    def strip_col(cls, col: pl.Series) -> pl.Series:
        """Remove leading and trailing whitespace from a string column.

        Args:
            col: Polars Series of string type (``pl.Utf8``).

        Returns:
            Series with leading and trailing whitespace removed from each value.
        """
        return col.str.strip_chars()

    @classmethod
    def lower_col(cls, col: pl.Series) -> pl.Series:
        """Convert a string column to lowercase.

        Args:
            col: Polars Series of string type (``pl.Utf8``).

        Returns:
            Series with all characters converted to lowercase.
        """
        return col.str.to_lowercase()

    @classmethod
    def round_col(
        cls,
        col: pl.Series,
        precision: int | None = None,
        *,
        compensate: bool = True,
    ) -> pl.Series:
        """Round a float column to specified precision.

        Uses Kahan summation algorithm to compensate for floating-point
        rounding errors when ``compensate=True``, ensuring that the sum of
        rounded values matches the rounded sum of original values.

        Args:
            col: Polars Series of float type (``pl.Float64``).
            precision: Number of decimal places. If None, uses the value from
                ``get_col_precision_map()`` for this column.
            compensate: If True, use Kahan summation to reduce cumulative
                rounding errors. Defaults to True.

        Returns:
            Series with values rounded to the specified precision.

        Note:
            Kahan summation is slower than simple rounding but provides better
            accuracy for financial or scientific calculations where cumulative
            rounding errors matter.
        """
        if precision is None:
            precision = cls.get_col_precision_map()[str(col.name)]
        if not compensate:
            return col.round(precision)

        # compensate for rounding errors with kahan sum
        error = 0.0
        values = []
        for value in col.to_list():  # Ensure iteration over Python floats
            corrected = value + error
            rounded = round(corrected, precision)
            error = corrected - rounded
            values.append(rounded)

        return pl.Series(name=col.name, values=values, dtype=col.dtype)

    @classmethod
    def skip_col_converter(cls, _col: pl.Series) -> pl.Series:
        """Placeholder to skip custom conversion for a column.

        Use this method in ``get_col_converter_map()`` to indicate that a
        column should not have custom conversion applied. This method should
        never be actually called - it's only used as a marker.

        Args:
            _col: Unused. The column that would be converted.

        Raises:
            NotImplementedError: Always raised if this method is called.

        Example:
            >>> @classmethod
            ... def get_col_converter_map(cls):
            ...     return {
            ...         "email": lambda s: s.str.to_lowercase(),
            ...         "user_id": cls.skip_col_converter,  # No conversion
            ...     }
        """
        msg = (
            "skip_col_converter is just a flag to skip conversion for a column "
            "and should not be actually called."
        )
        raise NotImplementedError(msg)

    def drop_null_subsets(self) -> None:
        """Drop rows where all columns in a subset are null.

        Applies null-dropping rules defined in ``get_drop_null_subsets()``.
        If no subsets are defined, drops rows where all columns are null.
        """
        subsets = self.get_drop_null_subsets()
        if not subsets:
            self.df = self.df.drop_nulls()
            return
        for subset in subsets:
            self.df = self.df.drop_nulls(subset=subset)

    def handle_duplicates(self) -> None:
        """Remove duplicate rows and aggregate specified columns.

        For each uniqueness subset defined in ``get_unique_subsets()``:
            1. Sum values in columns specified by ``get_add_on_duplicate_cols()``
            2. Keep only the first row of each duplicate group

        Example:
            If two rows have the same (user_id, date) and values 1 and 2 in
            the 'quantity' column, the result will have one row with
            quantity=3.
        """
        for subset in self.get_unique_subsets():
            for col in self.get_add_on_duplicate_cols():
                self.df = self.df.with_columns(pl.col(col).sum().over(subset))
            self.df = self.df.unique(subset=subset, keep="first")

    def sort_cols(self) -> None:
        """Sort the DataFrame by columns and directions from get_sort_cols().

        Applies multi-column sorting with per-column sort direction
        (ascending or descending) as defined in ``get_sort_cols()``.
        """
        cols, desc = zip(*self.get_sort_cols(), strict=True)
        if not cols:
            return
        self.df = self.df.sort(cols, descending=desc)

    def check(self) -> None:
        """Validate data quality after cleaning.

        Runs all validation checks in order:
            1. Correct data types for all columns
            2. No null values in required columns
            3. No NaN values in float columns

        This method is called automatically at the end of the ``clean()``
        pipeline.

        Raises:
            TypeError: If any column has an incorrect data type.
            ValueError: If required columns contain nulls or float columns
                contain NaN values.
        """
        self.check_correct_dtypes()
        self.check_no_null_cols()
        self.check_no_nan()

    def check_correct_dtypes(self) -> None:
        """Validate that all columns have their expected data types.

        Compares the actual DataFrame schema against the expected types
        defined in ``get_col_dtype_map()``.

        Raises:
            TypeError: If any column's actual type doesn't match the expected
                type from the schema.
        """
        schema = self.df.schema
        col_dtype_map = self.get_col_dtype_map()
        for col, dtype in col_dtype_map.items():
            schema_dtype = schema[col]
            if schema_dtype != dtype:
                msg = f"Expected dtype {dtype} for column {col}, got {schema_dtype}"
                raise TypeError(msg)

    def check_no_null_cols(self) -> None:
        """Validate that required columns contain no null values.

        Checks all columns defined in ``get_no_null_cols()`` for null values.

        Raises:
            ValueError: If any column in ``get_no_null_cols()`` contains null
                values.
        """
        no_null_cols = self.get_no_null_cols()
        # Use a single select to check all columns at once
        null_flags = self.df.select(
            [pl.col(col).is_null().any() for col in no_null_cols]
        )
        # Iterate over columns and check if any have nulls
        for col in no_null_cols:
            if null_flags[col].item():
                msg = f"Null values found in column: {col}"
                raise ValueError(msg)

    def check_no_nan(self) -> None:
        """Validate that float columns contain no NaN values.

        Checks all columns with float data types (``pl.Float64``, etc.) for
        NaN values.

        Raises:
            ValueError: If any float column contains NaN values.
        """
        float_cols = [
            col
            for col, dtype in self.get_col_dtype_map().items()
            if issubclass(dtype, FloatType)
        ]
        has_nan = self.df.select(
            pl.any_horizontal(pl.col(float_cols).is_nan().any())
        ).item()
        if has_nan:
            msg = "NaN values found in the dataframe"
            raise ValueError(msg)
__init__(*args, **kwargs)

Initialize the CleaningDF and execute the cleaning pipeline.

Creates a Polars DataFrame with NaN values automatically converted to null, then immediately executes the full cleaning pipeline. The schema is enforced from get_col_dtype_map().

Parameters:

Name Type Description Default
*args Any

Positional arguments passed to pl.DataFrame constructor.

()
**kwargs Any

Keyword arguments passed to pl.DataFrame constructor.

{}
Note

The following kwargs are automatically set and will override any user-provided values: - nan_to_null: Always set to True - schema: Set from get_col_dtype_map() - data: Replaced with renamed and filtered data

Source code in src/winiutils/core/data/dataframe/cleaning.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
def __init__(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Initialize the CleaningDF and execute the cleaning pipeline.

    Creates a Polars DataFrame with NaN values automatically converted to
    null, then immediately executes the full cleaning pipeline. The schema
    is enforced from ``get_col_dtype_map()``.

    Args:
        *args: Positional arguments passed to ``pl.DataFrame`` constructor.
        **kwargs: Keyword arguments passed to ``pl.DataFrame`` constructor.

    Note:
        The following kwargs are automatically set and will override any
        user-provided values:
            - ``nan_to_null``: Always set to True
            - ``schema``: Set from ``get_col_dtype_map()``
            - ``data``: Replaced with renamed and filtered data
    """
    # create a temp df for standardization and accepting all ploars arg and kwargs
    temp_df = pl.DataFrame(*args, **kwargs)
    temp_df = self.rename_cols(temp_df)
    temp_df = self.drop_cols(temp_df)

    # enforce standard kwargs and create the final df
    kwargs["data"] = temp_df.to_dict(as_series=True)
    kwargs["nan_to_null"] = True
    kwargs["schema"] = self.get_col_dtype_map()
    self.df = pl.DataFrame(**kwargs)
    self.clean()
check()

Validate data quality after cleaning.

Runs all validation checks in order
  1. Correct data types for all columns
  2. No null values in required columns
  3. No NaN values in float columns

This method is called automatically at the end of the clean() pipeline.

Raises:

Type Description
TypeError

If any column has an incorrect data type.

ValueError

If required columns contain nulls or float columns contain NaN values.

Source code in src/winiutils/core/data/dataframe/cleaning.py
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
def check(self) -> None:
    """Validate data quality after cleaning.

    Runs all validation checks in order:
        1. Correct data types for all columns
        2. No null values in required columns
        3. No NaN values in float columns

    This method is called automatically at the end of the ``clean()``
    pipeline.

    Raises:
        TypeError: If any column has an incorrect data type.
        ValueError: If required columns contain nulls or float columns
            contain NaN values.
    """
    self.check_correct_dtypes()
    self.check_no_null_cols()
    self.check_no_nan()
check_correct_dtypes()

Validate that all columns have their expected data types.

Compares the actual DataFrame schema against the expected types defined in get_col_dtype_map().

Raises:

Type Description
TypeError

If any column's actual type doesn't match the expected type from the schema.

Source code in src/winiutils/core/data/dataframe/cleaning.py
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
def check_correct_dtypes(self) -> None:
    """Validate that all columns have their expected data types.

    Compares the actual DataFrame schema against the expected types
    defined in ``get_col_dtype_map()``.

    Raises:
        TypeError: If any column's actual type doesn't match the expected
            type from the schema.
    """
    schema = self.df.schema
    col_dtype_map = self.get_col_dtype_map()
    for col, dtype in col_dtype_map.items():
        schema_dtype = schema[col]
        if schema_dtype != dtype:
            msg = f"Expected dtype {dtype} for column {col}, got {schema_dtype}"
            raise TypeError(msg)
check_no_nan()

Validate that float columns contain no NaN values.

Checks all columns with float data types (pl.Float64, etc.) for NaN values.

Raises:

Type Description
ValueError

If any float column contains NaN values.

Source code in src/winiutils/core/data/dataframe/cleaning.py
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
def check_no_nan(self) -> None:
    """Validate that float columns contain no NaN values.

    Checks all columns with float data types (``pl.Float64``, etc.) for
    NaN values.

    Raises:
        ValueError: If any float column contains NaN values.
    """
    float_cols = [
        col
        for col, dtype in self.get_col_dtype_map().items()
        if issubclass(dtype, FloatType)
    ]
    has_nan = self.df.select(
        pl.any_horizontal(pl.col(float_cols).is_nan().any())
    ).item()
    if has_nan:
        msg = "NaN values found in the dataframe"
        raise ValueError(msg)
check_no_null_cols()

Validate that required columns contain no null values.

Checks all columns defined in get_no_null_cols() for null values.

Raises:

Type Description
ValueError

If any column in get_no_null_cols() contains null values.

Source code in src/winiutils/core/data/dataframe/cleaning.py
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
def check_no_null_cols(self) -> None:
    """Validate that required columns contain no null values.

    Checks all columns defined in ``get_no_null_cols()`` for null values.

    Raises:
        ValueError: If any column in ``get_no_null_cols()`` contains null
            values.
    """
    no_null_cols = self.get_no_null_cols()
    # Use a single select to check all columns at once
    null_flags = self.df.select(
        [pl.col(col).is_null().any() for col in no_null_cols]
    )
    # Iterate over columns and check if any have nulls
    for col in no_null_cols:
        if null_flags[col].item():
            msg = f"Null values found in column: {col}"
            raise ValueError(msg)
clean()

Execute the complete data cleaning pipeline.

Applies all cleaning operations in the following order
  1. Fill null values with defaults
  2. Convert columns to correct types and apply transformations
  3. Drop rows with all-null column subsets
  4. Handle duplicates by aggregating and removing
  5. Sort the DataFrame
  6. Validate data quality
Note

Renaming and dropping columns are done during __init__ before this method is called. This method is automatically called during initialization.

Source code in src/winiutils/core/data/dataframe/cleaning.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def clean(self) -> None:
    """Execute the complete data cleaning pipeline.

    Applies all cleaning operations in the following order:
        1. Fill null values with defaults
        2. Convert columns to correct types and apply transformations
        3. Drop rows with all-null column subsets
        4. Handle duplicates by aggregating and removing
        5. Sort the DataFrame
        6. Validate data quality

    Note:
        Renaming and dropping columns are done during ``__init__`` before
        this method is called. This method is automatically called during
        initialization.
    """
    self.fill_nulls()
    self.convert_cols()
    self.drop_null_subsets()
    self.handle_duplicates()
    self.sort_cols()
    self.check()
convert_cols()

Apply standard and custom column conversions.

Orchestrates both standard conversions (string stripping, float rounding) and custom conversions defined in get_col_converter_map().

Raises:

Type Description
KeyError

If any columns are missing from the converter map.

Source code in src/winiutils/core/data/dataframe/cleaning.py
472
473
474
475
476
477
478
479
480
481
482
483
def convert_cols(self) -> None:
    """Apply standard and custom column conversions.

    Orchestrates both standard conversions (string stripping, float
    rounding) and custom conversions defined in ``get_col_converter_map()``.

    Raises:
        KeyError: If any columns are missing from the converter map.
    """
    self.raise_on_missing_cols(self.get_col_converter_map)
    self.standard_convert_cols()
    self.custom_convert_cols()
custom_convert_cols()

Apply custom conversion functions to columns.

Applies custom transformations from get_col_converter_map() to each column. Columns marked with skip_col_converter are skipped.

Source code in src/winiutils/core/data/dataframe/cleaning.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
def custom_convert_cols(self) -> None:
    """Apply custom conversion functions to columns.

    Applies custom transformations from ``get_col_converter_map()`` to each
    column. Columns marked with ``skip_col_converter`` are skipped.
    """
    self.df = self.df.with_columns(
        [
            pl.col(col_name).map_batches(
                converter, return_dtype=self.get_col_dtype_map()[col_name]
            )
            for col_name, converter in self.get_col_converter_map().items()
            if converter.__name__ != self.skip_col_converter.__name__  # ty:ignore[unresolved-attribute]
        ]
    )
drop_cols(temp_df)

Drop columns not defined in the schema.

Selects only the columns defined in get_col_names(), removing any extra columns that may have been in the input data.

Parameters:

Name Type Description Default
temp_df DataFrame

The DataFrame to filter columns from.

required

Returns:

Type Description
DataFrame

DataFrame containing only the columns defined in the schema.

Source code in src/winiutils/core/data/dataframe/cleaning.py
441
442
443
444
445
446
447
448
449
450
451
452
453
def drop_cols(self, temp_df: pl.DataFrame) -> pl.DataFrame:
    """Drop columns not defined in the schema.

    Selects only the columns defined in ``get_col_names()``, removing any
    extra columns that may have been in the input data.

    Args:
        temp_df: The DataFrame to filter columns from.

    Returns:
        DataFrame containing only the columns defined in the schema.
    """
    return temp_df.select(self.get_col_names())
drop_null_subsets()

Drop rows where all columns in a subset are null.

Applies null-dropping rules defined in get_drop_null_subsets(). If no subsets are defined, drops rows where all columns are null.

Source code in src/winiutils/core/data/dataframe/cleaning.py
616
617
618
619
620
621
622
623
624
625
626
627
def drop_null_subsets(self) -> None:
    """Drop rows where all columns in a subset are null.

    Applies null-dropping rules defined in ``get_drop_null_subsets()``.
    If no subsets are defined, drops rows where all columns are null.
    """
    subsets = self.get_drop_null_subsets()
    if not subsets:
        self.df = self.df.drop_nulls()
        return
    for subset in subsets:
        self.df = self.df.drop_nulls(subset=subset)
fill_nulls()

Fill null values with defaults from the fill null map.

Replaces null values in each column with the corresponding fill value from get_fill_null_map().

Raises:

Type Description
KeyError

If any columns are missing from the fill null map.

Source code in src/winiutils/core/data/dataframe/cleaning.py
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
def fill_nulls(self) -> None:
    """Fill null values with defaults from the fill null map.

    Replaces null values in each column with the corresponding fill value
    from ``get_fill_null_map()``.

    Raises:
        KeyError: If any columns are missing from the fill null map.
    """
    self.raise_on_missing_cols(self.get_fill_null_map)
    self.df = self.df.with_columns(
        [
            pl.col(col_name).fill_null(fill_value)
            for col_name, fill_value in self.get_fill_null_map().items()
        ]
    )
get_add_on_duplicate_cols() abstractmethod classmethod

Define columns to aggregate when duplicate rows are found.

This abstract method specifies which columns should have their values summed when duplicate rows are detected (based on get_unique_subsets()). The summed values are kept in the first row, and duplicate rows are removed.

Returns:

Type Description
str

Tuple of column names whose values should be summed when duplicates

...

are found.

Example

@classmethod ... def get_add_on_duplicate_cols(cls): ... return ("quantity", "revenue", "impressions")

Source code in src/winiutils/core/data/dataframe/cleaning.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
@classmethod
@abstractmethod
def get_add_on_duplicate_cols(cls) -> tuple[str, ...]:
    """Define columns to aggregate when duplicate rows are found.

    This abstract method specifies which columns should have their values
    summed when duplicate rows are detected (based on
    ``get_unique_subsets()``). The summed values are kept in the first row,
    and duplicate rows are removed.

    Returns:
        Tuple of column names whose values should be summed when duplicates
        are found.

    Example:
        >>> @classmethod
        ... def get_add_on_duplicate_cols(cls):
        ...     return ("quantity", "revenue", "impressions")
    """
get_col_converter_map() abstractmethod classmethod

Define custom conversion functions for columns.

This abstract method specifies custom transformations to apply to columns after standard conversions (string stripping, float rounding). Each function receives a Polars Series and returns a transformed Series. Use skip_col_converter as a placeholder for columns that don't need custom conversion.

Returns:

Type Description
dict[str, Callable[[Series], Series]]

Dictionary mapping column names to their conversion functions.

dict[str, Callable[[Series], Series]]

Each function takes a Series and returns a transformed Series.

Example

@classmethod ... def get_col_converter_map(cls): ... return { ... "email": lambda s: s.str.to_lowercase(), ... "phone": cls.parse_phone_number, ... "created_at": cls.skip_col_converter, ... }

Source code in src/winiutils/core/data/dataframe/cleaning.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
@classmethod
@abstractmethod
def get_col_converter_map(
    cls,
) -> dict[str, Callable[[pl.Series], pl.Series]]:
    """Define custom conversion functions for columns.

    This abstract method specifies custom transformations to apply to
    columns after standard conversions (string stripping, float rounding).
    Each function receives a Polars Series and returns a transformed
    Series. Use ``skip_col_converter`` as a placeholder for columns that
    don't need custom conversion.

    Returns:
        Dictionary mapping column names to their conversion functions.
        Each function takes a Series and returns a transformed Series.

    Example:
        >>> @classmethod
        ... def get_col_converter_map(cls):
        ...     return {
        ...         "email": lambda s: s.str.to_lowercase(),
        ...         "phone": cls.parse_phone_number,
        ...         "created_at": cls.skip_col_converter,
        ...     }
    """
get_col_dtype_map() abstractmethod classmethod

Define the expected data type for each column in the cleaned DataFrame.

This abstract method must be implemented in child classes to specify the target data types for all columns. The DataFrame will be validated against this schema after cleaning, and a TypeError will be raised if any column has an incorrect type.

Returns:

Type Description
dict[str, type[DataType]]

Dictionary mapping standardized column names to their expected

dict[str, type[DataType]]

Polars data types.

Example

@classmethod ... def get_col_dtype_map(cls): ... return { ... "user_id": pl.Int64, ... "email": pl.Utf8, ... "created_at": pl.Date, ... "score": pl.Float64, ... }

Source code in src/winiutils/core/data/dataframe/cleaning.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
@classmethod
@abstractmethod
def get_col_dtype_map(cls) -> dict[str, type[pl.DataType]]:
    """Define the expected data type for each column in the cleaned DataFrame.

    This abstract method must be implemented in child classes to specify the
    target data types for all columns. The DataFrame will be validated against
    this schema after cleaning, and a TypeError will be raised if any column
    has an incorrect type.

    Returns:
        Dictionary mapping standardized column names to their expected
        Polars data types.

    Example:
        >>> @classmethod
        ... def get_col_dtype_map(cls):
        ...     return {
        ...         "user_id": pl.Int64,
        ...         "email": pl.Utf8,
        ...         "created_at": pl.Date,
        ...         "score": pl.Float64,
        ...     }
    """
get_col_names() classmethod

Get the standardized column names from the dtype map.

Returns:

Type Description
str

Tuple of standardized column names in the order they appear

...

in get_col_dtype_map().

Source code in src/winiutils/core/data/dataframe/cleaning.py
367
368
369
370
371
372
373
374
375
@classmethod
def get_col_names(cls) -> tuple[str, ...]:
    """Get the standardized column names from the dtype map.

    Returns:
        Tuple of standardized column names in the order they appear
        in ``get_col_dtype_map()``.
    """
    return tuple(cls.get_col_dtype_map().keys())
get_col_precision_map() abstractmethod classmethod

Define rounding precision for float columns.

This abstract method specifies the number of decimal places to round float columns to. Rounding is applied during the standard conversion phase and uses Kahan summation to compensate for floating-point rounding errors.

Returns:

Type Description
dict[str, int]

Dictionary mapping float column names to their precision

dict[str, int]

(number of decimal places).

Example

@classmethod ... def get_col_precision_map(cls): ... return { ... "price": 2, ... "percentage": 4, ... "score": 1, ... }

Source code in src/winiutils/core/data/dataframe/cleaning.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
@classmethod
@abstractmethod
def get_col_precision_map(cls) -> dict[str, int]:
    """Define rounding precision for float columns.

    This abstract method specifies the number of decimal places to round
    float columns to. Rounding is applied during the standard conversion
    phase and uses Kahan summation to compensate for floating-point
    rounding errors.

    Returns:
        Dictionary mapping float column names to their precision
        (number of decimal places).

    Example:
        >>> @classmethod
        ... def get_col_precision_map(cls):
        ...     return {
        ...         "price": 2,
        ...         "percentage": 4,
        ...         "score": 1,
        ...     }
    """
get_drop_null_subsets() abstractmethod classmethod

Define column subsets for dropping rows with all-null values.

This abstract method specifies which column subsets should trigger row deletion. A row is dropped if ALL columns in a subset are null. Multiple subsets can be defined to apply different null-dropping rules. If no subsets are defined, rows where all columns are null will be dropped.

Returns:

Type Description
tuple[str, ...]

Tuple of column name tuples, where each inner tuple represents one

...

subset. A row is dropped if all columns in any subset are null.

Example

@classmethod ... def get_drop_null_subsets(cls): ... return ( ... ("email", "phone"), # Drop if both are null ... ("address_line1",), # Drop if null ... )

Source code in src/winiutils/core/data/dataframe/cleaning.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
@classmethod
@abstractmethod
def get_drop_null_subsets(cls) -> tuple[tuple[str, ...], ...]:
    """Define column subsets for dropping rows with all-null values.

    This abstract method specifies which column subsets should trigger row
    deletion. A row is dropped if ALL columns in a subset are null. Multiple
    subsets can be defined to apply different null-dropping rules. If no
    subsets are defined, rows where all columns are null will be dropped.

    Returns:
        Tuple of column name tuples, where each inner tuple represents one
        subset. A row is dropped if all columns in any subset are null.

    Example:
        >>> @classmethod
        ... def get_drop_null_subsets(cls):
        ...     return (
        ...         ("email", "phone"),  # Drop if both are null
        ...         ("address_line1",),  # Drop if null
        ...     )
    """
get_fill_null_map() abstractmethod classmethod

Define null value fill strategies for each column.

This abstract method specifies default values to fill null entries in each column. This is applied early in the cleaning pipeline after column renaming.

Returns:

Type Description
dict[str, Any]

Dictionary mapping column names to their fill values. The fill

dict[str, Any]

value can be any type appropriate for the column's data type.

Example

@classmethod ... def get_fill_null_map(cls): ... return { ... "email": "", ... "phone": "", ... "score": 0, ... "status": "unknown", ... }

Source code in src/winiutils/core/data/dataframe/cleaning.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@classmethod
@abstractmethod
def get_fill_null_map(cls) -> dict[str, Any]:
    """Define null value fill strategies for each column.

    This abstract method specifies default values to fill null entries in
    each column. This is applied early in the cleaning pipeline after
    column renaming.

    Returns:
        Dictionary mapping column names to their fill values. The fill
        value can be any type appropriate for the column's data type.

    Example:
        >>> @classmethod
        ... def get_fill_null_map(cls):
        ...     return {
        ...         "email": "",
        ...         "phone": "",
        ...         "score": 0,
        ...         "status": "unknown",
        ...     }
    """
get_no_null_cols() abstractmethod classmethod

Define columns that must not contain null values.

This abstract method specifies which columns are required to have non-null values. A ValueError is raised during the final validation step if any of these columns contain null values.

Returns:

Type Description
tuple[str, ...]

Tuple of column names that must not contain null values.

Example

@classmethod ... def get_no_null_cols(cls): ... return ("user_id", "email", "created_at")

Source code in src/winiutils/core/data/dataframe/cleaning.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
@classmethod
@abstractmethod
def get_no_null_cols(cls) -> tuple[str, ...]:
    """Define columns that must not contain null values.

    This abstract method specifies which columns are required to have
    non-null values. A ValueError is raised during the final validation
    step if any of these columns contain null values.

    Returns:
        Tuple of column names that must not contain null values.

    Example:
        >>> @classmethod
        ... def get_no_null_cols(cls):
        ...     return ("user_id", "email", "created_at")
    """
get_rename_map() abstractmethod classmethod

Define column name mappings for standardization.

This abstract method must be implemented in child classes to specify how raw input column names should be renamed to standardized names. Renaming is the first operation in the cleaning pipeline, executed before all other cleaning operations.

The mapping format follows the CleaningDF convention of mapping standardized names to raw input names. The reverse mapping is applied to the DataFrame during cleaning.

Returns:

Type Description
dict[str, str]

Dictionary mapping standardized column names (keys) to raw input

dict[str, str]

column names (values).

Example

@classmethod ... def get_rename_map(cls): ... return { ... "user_id": "UserId", ... "email": "Email_Address", ... "created_at": "CreatedDate", ... }

Source code in src/winiutils/core/data/dataframe/cleaning.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@classmethod
@abstractmethod
def get_rename_map(cls) -> dict[str, str]:
    """Define column name mappings for standardization.

    This abstract method must be implemented in child classes to specify how
    raw input column names should be renamed to standardized names. Renaming
    is the first operation in the cleaning pipeline, executed before all other
    cleaning operations.

    The mapping format follows the CleaningDF convention of mapping
    standardized names to raw input names. The reverse mapping is applied
    to the DataFrame during cleaning.

    Returns:
        Dictionary mapping standardized column names (keys) to raw input
        column names (values).

    Example:
        >>> @classmethod
        ... def get_rename_map(cls):
        ...     return {
        ...         "user_id": "UserId",
        ...         "email": "Email_Address",
        ...         "created_at": "CreatedDate",
        ...     }
    """
get_sort_cols() abstractmethod classmethod

Define the sort order for the cleaned DataFrame.

This abstract method specifies which columns to sort by and in what order (ascending or descending). Sorting is applied near the end of the cleaning pipeline, after all data transformations are complete.

Returns:

Type Description
tuple[str, bool]

Tuple of (column_name, is_descending) tuples. Each tuple specifies

...

a column and sort direction. Columns are sorted in the order they

tuple[tuple[str, bool], ...]

appear. True = descending, False = ascending.

Example

@classmethod ... def get_sort_cols(cls): ... return ( ... ("created_at", True), # Descending ... ("user_id", False), # Ascending ... )

Source code in src/winiutils/core/data/dataframe/cleaning.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@classmethod
@abstractmethod
def get_sort_cols(cls) -> tuple[tuple[str, bool], ...]:
    """Define the sort order for the cleaned DataFrame.

    This abstract method specifies which columns to sort by and in what
    order (ascending or descending). Sorting is applied near the end of
    the cleaning pipeline, after all data transformations are complete.

    Returns:
        Tuple of (column_name, is_descending) tuples. Each tuple specifies
        a column and sort direction. Columns are sorted in the order they
        appear. True = descending, False = ascending.

    Example:
        >>> @classmethod
        ... def get_sort_cols(cls):
        ...     return (
        ...         ("created_at", True),   # Descending
        ...         ("user_id", False),     # Ascending
        ...     )
    """
get_unique_subsets() abstractmethod classmethod

Define column subsets for duplicate detection and removal.

This abstract method specifies which column combinations define uniqueness. Rows are considered duplicates if they have identical values in all columns of a subset. When duplicates are found, values in columns specified by get_add_on_duplicate_cols() are summed, and the first row is kept.

Returns:

Type Description
tuple[str, ...]

Tuple of column name tuples, where each inner tuple represents

...

one uniqueness constraint. Duplicates are detected and handled

tuple[tuple[str, ...], ...]

for each subset independently.

Example

@classmethod ... def get_unique_subsets(cls): ... return ( ... ("user_id", "date"), # Unique by user_id and date ... ("transaction_id",), # Unique by transaction_id ... )

Source code in src/winiutils/core/data/dataframe/cleaning.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
@classmethod
@abstractmethod
def get_unique_subsets(cls) -> tuple[tuple[str, ...], ...]:
    """Define column subsets for duplicate detection and removal.

    This abstract method specifies which column combinations define
    uniqueness. Rows are considered duplicates if they have identical
    values in all columns of a subset. When duplicates are found, values
    in columns specified by ``get_add_on_duplicate_cols()`` are summed,
    and the first row is kept.

    Returns:
        Tuple of column name tuples, where each inner tuple represents
        one uniqueness constraint. Duplicates are detected and handled
        for each subset independently.

    Example:
        >>> @classmethod
        ... def get_unique_subsets(cls):
        ...     return (
        ...         ("user_id", "date"),      # Unique by user_id and date
        ...         ("transaction_id",),      # Unique by transaction_id
        ...     )
    """
handle_duplicates()

Remove duplicate rows and aggregate specified columns.

For each uniqueness subset defined in get_unique_subsets(): 1. Sum values in columns specified by get_add_on_duplicate_cols() 2. Keep only the first row of each duplicate group

Example

If two rows have the same (user_id, date) and values 1 and 2 in the 'quantity' column, the result will have one row with quantity=3.

Source code in src/winiutils/core/data/dataframe/cleaning.py
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
def handle_duplicates(self) -> None:
    """Remove duplicate rows and aggregate specified columns.

    For each uniqueness subset defined in ``get_unique_subsets()``:
        1. Sum values in columns specified by ``get_add_on_duplicate_cols()``
        2. Keep only the first row of each duplicate group

    Example:
        If two rows have the same (user_id, date) and values 1 and 2 in
        the 'quantity' column, the result will have one row with
        quantity=3.
    """
    for subset in self.get_unique_subsets():
        for col in self.get_add_on_duplicate_cols():
            self.df = self.df.with_columns(pl.col(col).sum().over(subset))
        self.df = self.df.unique(subset=subset, keep="first")
lower_col(col) classmethod

Convert a string column to lowercase.

Parameters:

Name Type Description Default
col Series

Polars Series of string type (pl.Utf8).

required

Returns:

Type Description
Series

Series with all characters converted to lowercase.

Source code in src/winiutils/core/data/dataframe/cleaning.py
531
532
533
534
535
536
537
538
539
540
541
@classmethod
def lower_col(cls, col: pl.Series) -> pl.Series:
    """Convert a string column to lowercase.

    Args:
        col: Polars Series of string type (``pl.Utf8``).

    Returns:
        Series with all characters converted to lowercase.
    """
    return col.str.to_lowercase()
raise_on_missing_cols(map_func) classmethod

Validate that all required columns are present in a configuration map.

Checks that the columns returned by map_func contain all columns defined in the schema. Raises KeyError if any required columns are missing from the map.

Parameters:

Name Type Description Default
map_func Callable[..., dict[str, Any]]

A callable that returns a dict with column names as keys.

required

Raises:

Type Description
KeyError

If any required columns are missing from the map.

Source code in src/winiutils/core/data/dataframe/cleaning.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
@classmethod
def raise_on_missing_cols(
    cls,
    map_func: Callable[..., dict[str, Any]],
) -> None:
    """Validate that all required columns are present in a configuration map.

    Checks that the columns returned by ``map_func`` contain all columns
    defined in the schema. Raises KeyError if any required columns are
    missing from the map.

    Args:
        map_func: A callable that returns a dict with column names as keys.

    Raises:
        KeyError: If any required columns are missing from the map.
    """
    col_names = cls.get_col_names()
    missing_cols = set(col_names) - set(map_func().keys())
    if missing_cols:
        msg = f"Missing columns in {map_func}: {missing_cols}"
        raise KeyError(msg)
rename_cols(temp_df)

Rename columns from raw names to standardized names.

Applies the reverse of get_rename_map() to rename columns from their raw input names to standardized names.

Parameters:

Name Type Description Default
temp_df DataFrame

The DataFrame with raw column names to rename.

required

Returns:

Type Description
DataFrame

DataFrame with columns renamed to standardized names.

Raises:

Type Description
KeyError

If any required columns are missing from the rename map.

Source code in src/winiutils/core/data/dataframe/cleaning.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
def rename_cols(self, temp_df: pl.DataFrame) -> pl.DataFrame:
    """Rename columns from raw names to standardized names.

    Applies the reverse of ``get_rename_map()`` to rename columns from
    their raw input names to standardized names.

    Args:
        temp_df: The DataFrame with raw column names to rename.

    Returns:
        DataFrame with columns renamed to standardized names.

    Raises:
        KeyError: If any required columns are missing from the rename map.
    """
    self.raise_on_missing_cols(self.get_rename_map)
    return temp_df.rename(reverse_dict(self.get_rename_map()))
round_col(col, precision=None, *, compensate=True) classmethod

Round a float column to specified precision.

Uses Kahan summation algorithm to compensate for floating-point rounding errors when compensate=True, ensuring that the sum of rounded values matches the rounded sum of original values.

Parameters:

Name Type Description Default
col Series

Polars Series of float type (pl.Float64).

required
precision int | None

Number of decimal places. If None, uses the value from get_col_precision_map() for this column.

None
compensate bool

If True, use Kahan summation to reduce cumulative rounding errors. Defaults to True.

True

Returns:

Type Description
Series

Series with values rounded to the specified precision.

Note

Kahan summation is slower than simple rounding but provides better accuracy for financial or scientific calculations where cumulative rounding errors matter.

Source code in src/winiutils/core/data/dataframe/cleaning.py
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
@classmethod
def round_col(
    cls,
    col: pl.Series,
    precision: int | None = None,
    *,
    compensate: bool = True,
) -> pl.Series:
    """Round a float column to specified precision.

    Uses Kahan summation algorithm to compensate for floating-point
    rounding errors when ``compensate=True``, ensuring that the sum of
    rounded values matches the rounded sum of original values.

    Args:
        col: Polars Series of float type (``pl.Float64``).
        precision: Number of decimal places. If None, uses the value from
            ``get_col_precision_map()`` for this column.
        compensate: If True, use Kahan summation to reduce cumulative
            rounding errors. Defaults to True.

    Returns:
        Series with values rounded to the specified precision.

    Note:
        Kahan summation is slower than simple rounding but provides better
        accuracy for financial or scientific calculations where cumulative
        rounding errors matter.
    """
    if precision is None:
        precision = cls.get_col_precision_map()[str(col.name)]
    if not compensate:
        return col.round(precision)

    # compensate for rounding errors with kahan sum
    error = 0.0
    values = []
    for value in col.to_list():  # Ensure iteration over Python floats
        corrected = value + error
        rounded = round(corrected, precision)
        error = corrected - rounded
        values.append(rounded)

    return pl.Series(name=col.name, values=values, dtype=col.dtype)
skip_col_converter(_col) classmethod

Placeholder to skip custom conversion for a column.

Use this method in get_col_converter_map() to indicate that a column should not have custom conversion applied. This method should never be actually called - it's only used as a marker.

Parameters:

Name Type Description Default
_col Series

Unused. The column that would be converted.

required

Raises:

Type Description
NotImplementedError

Always raised if this method is called.

Example

@classmethod ... def get_col_converter_map(cls): ... return { ... "email": lambda s: s.str.to_lowercase(), ... "user_id": cls.skip_col_converter, # No conversion ... }

Source code in src/winiutils/core/data/dataframe/cleaning.py
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
@classmethod
def skip_col_converter(cls, _col: pl.Series) -> pl.Series:
    """Placeholder to skip custom conversion for a column.

    Use this method in ``get_col_converter_map()`` to indicate that a
    column should not have custom conversion applied. This method should
    never be actually called - it's only used as a marker.

    Args:
        _col: Unused. The column that would be converted.

    Raises:
        NotImplementedError: Always raised if this method is called.

    Example:
        >>> @classmethod
        ... def get_col_converter_map(cls):
        ...     return {
        ...         "email": lambda s: s.str.to_lowercase(),
        ...         "user_id": cls.skip_col_converter,  # No conversion
        ...     }
    """
    msg = (
        "skip_col_converter is just a flag to skip conversion for a column "
        "and should not be actually called."
    )
    raise NotImplementedError(msg)
sort_cols()

Sort the DataFrame by columns and directions from get_sort_cols().

Applies multi-column sorting with per-column sort direction (ascending or descending) as defined in get_sort_cols().

Source code in src/winiutils/core/data/dataframe/cleaning.py
646
647
648
649
650
651
652
653
654
655
def sort_cols(self) -> None:
    """Sort the DataFrame by columns and directions from get_sort_cols().

    Applies multi-column sorting with per-column sort direction
    (ascending or descending) as defined in ``get_sort_cols()``.
    """
    cols, desc = zip(*self.get_sort_cols(), strict=True)
    if not cols:
        return
    self.df = self.df.sort(cols, descending=desc)
standard_convert_cols()

Apply standard conversions based on data type.

Automatically applies the following transformations
  • pl.Utf8 columns: Strip leading/trailing whitespace
  • pl.Float64 columns: Round to precision using Kahan summation
Source code in src/winiutils/core/data/dataframe/cleaning.py
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def standard_convert_cols(self) -> None:
    """Apply standard conversions based on data type.

    Automatically applies the following transformations:
        - ``pl.Utf8`` columns: Strip leading/trailing whitespace
        - ``pl.Float64`` columns: Round to precision using Kahan summation
    """
    for col_name, dtype in self.get_col_dtype_map().items():
        if dtype == pl.Utf8:
            converter = self.strip_col
        elif dtype == pl.Float64:
            converter = self.round_col
        else:
            continue
        self.df = self.df.with_columns(
            pl.col(col_name).map_batches(converter, return_dtype=dtype)
        )
strip_col(col) classmethod

Remove leading and trailing whitespace from a string column.

Parameters:

Name Type Description Default
col Series

Polars Series of string type (pl.Utf8).

required

Returns:

Type Description
Series

Series with leading and trailing whitespace removed from each value.

Source code in src/winiutils/core/data/dataframe/cleaning.py
519
520
521
522
523
524
525
526
527
528
529
@classmethod
def strip_col(cls, col: pl.Series) -> pl.Series:
    """Remove leading and trailing whitespace from a string column.

    Args:
        col: Polars Series of string type (``pl.Utf8``).

    Returns:
        Series with leading and trailing whitespace removed from each value.
    """
    return col.str.strip_chars()

structures

Data structures utilities package.

This package provides utilities for working with common data structures:

Modules:

Name Description
dicts

Dictionary manipulation utilities.

text

Text and string processing utilities.

dicts

Dictionary manipulation utilities.

This module provides utility functions for common dictionary operations such as reversing key-value pairs.

Example

from winiutils.core.data.structures.dicts import reverse_dict original = {"a": 1, "b": 2} reverse_dict(original) {1: 'a', 2: 'b'}

reverse_dict(d)

Reverse the keys and values of a dictionary.

Creates a new dictionary where the original values become keys and the original keys become values.

Parameters:

Name Type Description Default
d dict[Any, Any]

The dictionary to reverse. Values must be hashable to serve as keys in the resulting dictionary.

required

Returns:

Type Description
dict[Any, Any]

A new dictionary with keys and values swapped from the original.

Raises:

Type Description
TypeError

If any value in the input dictionary is not hashable.

Warning

If the original dictionary contains duplicate values, only the last key-value pair for each value will be preserved in the result.

Example

reverse_dict({"name": "alice", "role": "admin"}) {'alice': 'name', 'admin': 'role'}

Source code in src/winiutils/core/data/structures/dicts.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def reverse_dict(d: dict[Any, Any]) -> dict[Any, Any]:
    """Reverse the keys and values of a dictionary.

    Creates a new dictionary where the original values become keys and
    the original keys become values.

    Args:
        d: The dictionary to reverse. Values must be hashable to serve
            as keys in the resulting dictionary.

    Returns:
        A new dictionary with keys and values swapped from the original.

    Raises:
        TypeError: If any value in the input dictionary is not hashable.

    Warning:
        If the original dictionary contains duplicate values, only the last
        key-value pair for each value will be preserved in the result.

    Example:
        >>> reverse_dict({"name": "alice", "role": "admin"})
        {'alice': 'name', 'admin': 'role'}
    """
    return {v: k for k, v in d.items()}
text

Text processing utilities package.

This package provides utilities for text and string manipulation:

Modules:

Name Description
string

String manipulation, hashing, XML parsing, and input utilities.

string_

String manipulation utilities for text processing.

This module provides utility functions for common string operations including
  • User input with timeout constraints
  • XML namespace extraction
  • String truncation for logging
  • Deterministic hash generation
Example

from winiutils.core.data.structures.text.string_ import ( ... value_to_truncated_string, ... get_reusable_hash, ... ) value_to_truncated_string("Hello, World!", max_length=10) 'Hello,...' get_reusable_hash("test") # doctest: +ELLIPSIS '9f86d08...'

ask_for_input_with_timeout(prompt, timeout)

Request user input with a timeout constraint.

Displays a prompt to the user and waits for input. If the user does not provide input within the specified timeout period, a TimeoutError is raised.

This function uses multiprocessing internally to enforce the timeout, so it spawns a separate process for the input operation.

Parameters:

Name Type Description Default
prompt str

The text prompt to display to the user before waiting for input.

required
timeout int

Maximum time in seconds to wait for user input.

required

Returns:

Type Description
str

The user's input as a stripped string.

Raises:

Type Description
TimeoutError

If the user doesn't provide input within the timeout period.

Example
This example would block waiting for input
response = ask_for_input_with_timeout("Enter name: ", timeout=30)
Source code in src/winiutils/core/data/structures/text/string_.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def ask_for_input_with_timeout(prompt: str, timeout: int) -> str:
    """Request user input with a timeout constraint.

    Displays a prompt to the user and waits for input. If the user does not
    provide input within the specified timeout period, a TimeoutError is raised.

    This function uses multiprocessing internally to enforce the timeout,
    so it spawns a separate process for the input operation.

    Args:
        prompt: The text prompt to display to the user before waiting for input.
        timeout: Maximum time in seconds to wait for user input.

    Returns:
        The user's input as a stripped string.

    Raises:
        multiprocessing.TimeoutError: If the user doesn't provide input within
            the timeout period.

    Example:
        >>> # This example would block waiting for input
        >>> # response = ask_for_input_with_timeout("Enter name: ", timeout=30)
    """

    @cancel_on_timeout(timeout, "Input not given within the timeout")
    def give_input() -> str:
        return input(prompt)

    user_input: str = give_input()

    return user_input
find_xml_namespaces(xml)

Extract namespace declarations from XML content.

Parses the XML content and extracts all namespace prefix-to-URI mappings, excluding the default (empty prefix) namespace. Uses defusedxml for safe XML parsing to prevent XML-based attacks.

Parameters:

Name Type Description Default
xml str | StringIO

XML content as a string or StringIO object. If a string is provided, it will be wrapped in a StringIO internally.

required

Returns:

Type Description
dict[str, str]

A dictionary mapping namespace prefixes to their URIs. The default

dict[str, str]

namespace (empty prefix) is excluded from the result.

Example

xml_content = ''' ... ... ''' find_xml_namespaces(xml_content) {'soap': 'http://schemas.xmlsoap.org/soap/envelope/'}

Source code in src/winiutils/core/data/structures/text/string_.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def find_xml_namespaces(xml: str | StringIO) -> dict[str, str]:
    """Extract namespace declarations from XML content.

    Parses the XML content and extracts all namespace prefix-to-URI mappings,
    excluding the default (empty prefix) namespace. Uses defusedxml for safe
    XML parsing to prevent XML-based attacks.

    Args:
        xml: XML content as a string or StringIO object. If a string is
            provided, it will be wrapped in a StringIO internally.

    Returns:
        A dictionary mapping namespace prefixes to their URIs. The default
        namespace (empty prefix) is excluded from the result.

    Example:
        >>> xml_content = '''<?xml version="1.0"?>
        ... <root xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/">
        ... </root>'''
        >>> find_xml_namespaces(xml_content)
        {'soap': 'http://schemas.xmlsoap.org/soap/envelope/'}
    """
    if not isinstance(xml, StringIO):
        xml = StringIO(xml)
    # Extract the namespaces from the root tag
    namespaces_: dict[str, str] = {}
    iter_ns = DefusedElementTree.iterparse(xml, events=["start-ns"])
    for _, namespace_data in iter_ns:
        prefix, uri = namespace_data
        namespaces_[str(prefix)] = str(uri)

    namespaces_.pop("", None)

    return namespaces_
get_reusable_hash(value)

Generate a deterministic SHA-256 hash for any object.

Creates a consistent hash based on the string representation of the given value. Unlike Python's built-in hash() function, this hash is: - Deterministic across Python sessions - Consistent across different machines - Suitable for caching, deduplication, or identification

Parameters:

Name Type Description Default
value object

Any object to hash. The object's __str__ method is used to generate the string representation for hashing.

required

Returns:

Type Description
str

A 64-character hexadecimal string representation of the SHA-256 hash.

Note

Two objects with the same string representation will produce the same hash, even if they are different types or have different internal state.

Example

get_reusable_hash("test") '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08' get_reusable_hash({"key": "value"}) # doctest: +ELLIPSIS '...'

Source code in src/winiutils/core/data/structures/text/string_.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def get_reusable_hash(value: object) -> str:
    """Generate a deterministic SHA-256 hash for any object.

    Creates a consistent hash based on the string representation of the given
    value. Unlike Python's built-in ``hash()`` function, this hash is:
        - Deterministic across Python sessions
        - Consistent across different machines
        - Suitable for caching, deduplication, or identification

    Args:
        value: Any object to hash. The object's ``__str__`` method is used
            to generate the string representation for hashing.

    Returns:
        A 64-character hexadecimal string representation of the SHA-256 hash.

    Note:
        Two objects with the same string representation will produce the same
        hash, even if they are different types or have different internal state.

    Example:
        >>> get_reusable_hash("test")
        '9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08'
        >>> get_reusable_hash({"key": "value"})  # doctest: +ELLIPSIS
        '...'
    """
    value_str = str(value)
    return hashlib.sha256(value_str.encode("utf-8")).hexdigest()
value_to_truncated_string(value, max_length)

Convert any value to a string and truncate if it exceeds the maximum length.

Useful for logging or displaying values where space is limited. The string is truncated at word boundaries when possible, with "..." appended to indicate truncation.

Parameters:

Name Type Description Default
value object

Any object to convert to a string representation.

required
max_length int

Maximum length of the resulting string, including the ellipsis placeholder if truncation occurs.

required

Returns:

Type Description
str

The string representation of the value, truncated to max_length

str

characters if necessary with "..." as the truncation indicator.

Example

value_to_truncated_string("Hello, World!", max_length=10) 'Hello,...' value_to_truncated_string([1, 2, 3], max_length=20) '[1, 2, 3]'

Source code in src/winiutils/core/data/structures/text/string_.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def value_to_truncated_string(value: object, max_length: int) -> str:
    """Convert any value to a string and truncate if it exceeds the maximum length.

    Useful for logging or displaying values where space is limited. The string
    is truncated at word boundaries when possible, with "..." appended to
    indicate truncation.

    Args:
        value: Any object to convert to a string representation.
        max_length: Maximum length of the resulting string, including the
            ellipsis placeholder if truncation occurs.

    Returns:
        The string representation of the value, truncated to max_length
        characters if necessary with "..." as the truncation indicator.

    Example:
        >>> value_to_truncated_string("Hello, World!", max_length=10)
        'Hello,...'
        >>> value_to_truncated_string([1, 2, 3], max_length=20)
        '[1, 2, 3]'
    """
    string = str(value)
    return textwrap.shorten(string, width=max_length, placeholder="...")

iterating

Iteration utilities package.

This package provides utilities for iteration and parallel processing:

Modules:

Name Description
iterate

Basic iteration utilities for working with iterables.

concurrent

Concurrent processing utilities for multiprocessing and multithreading.

concurrent

Concurrent processing utilities package.

This package provides utilities for parallel execution using processes and threads:

Modules:

Name Description
concurrent

Core concurrent processing infrastructure and shared utilities.

multiprocessing

CPU-bound parallel processing using multiprocessing pools.

multithreading

I/O-bound parallel processing using thread pools.

concurrent

Concurrent processing utilities for parallel execution.

This module provides core functions for concurrent processing using both multiprocessing and multithreading approaches. It includes utilities for handling timeouts, managing process pools, and organizing parallel execution of functions.

The main entry point is concurrent_loop(), which provides a unified interface for both threading and multiprocessing execution.

Example

from winiutils.core.iterating.concurrent.concurrent import concurrent_loop def square(x): ... return x * x results = concurrent_loop( ... threading=True, ... process_function=square, ... process_args=[[1], [2], [3]], ... process_args_len=3, ... ) results [1, 4, 9]

concurrent_loop(*, threading, process_function, process_args, process_args_static=None, deepcopy_static_args=None, process_args_len=1)

Execute a function concurrently with multiple argument sets.

Core function that provides a unified interface for both multiprocessing and multithreading execution. This is the internal implementation used by multiprocess_loop() and multithread_loop().

Parameters:

Name Type Description Default
threading bool

Whether to use threading (True) or multiprocessing (False). Use threading for I/O-bound tasks and multiprocessing for CPU-bound tasks.

required
process_function Callable[..., Any]

The function to execute concurrently. Must be pickle-able for multiprocessing.

required
process_args Iterable[Iterable[Any]]

Iterable of argument lists for each parallel call. Each inner iterable contains the arguments for one function call.

required
process_args_static Iterable[Any] | None

Optional constant arguments to append to each call. These are shared across all calls without copying. Defaults to None.

None
deepcopy_static_args Iterable[Any] | None

Optional arguments that should be deep-copied for each process. Use this for mutable objects that should not be shared between processes. Defaults to None.

None
process_args_len int

Length of process_args. Used for progress bar and worker pool sizing. Defaults to 1.

1

Returns:

Type Description
list[Any]

List of results from the function executions, in the original

list[Any]

submission order.

Note

This function is not meant to be used directly. Use multiprocess_loop() for CPU-bound tasks or multithread_loop() for I/O-bound tasks instead.

Source code in src/winiutils/core/iterating/concurrent/concurrent.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def concurrent_loop(  # noqa: PLR0913
    *,
    threading: bool,
    process_function: Callable[..., Any],
    process_args: Iterable[Iterable[Any]],
    process_args_static: Iterable[Any] | None = None,
    deepcopy_static_args: Iterable[Any] | None = None,
    process_args_len: int = 1,
) -> list[Any]:
    """Execute a function concurrently with multiple argument sets.

    Core function that provides a unified interface for both multiprocessing
    and multithreading execution. This is the internal implementation used
    by ``multiprocess_loop()`` and ``multithread_loop()``.

    Args:
        threading: Whether to use threading (True) or multiprocessing
            (False). Use threading for I/O-bound tasks and multiprocessing
            for CPU-bound tasks.
        process_function: The function to execute concurrently. Must be
            pickle-able for multiprocessing.
        process_args: Iterable of argument lists for each parallel call.
            Each inner iterable contains the arguments for one function
            call.
        process_args_static: Optional constant arguments to append to each
            call. These are shared across all calls without copying.
            Defaults to None.
        deepcopy_static_args: Optional arguments that should be deep-copied
            for each process. Use this for mutable objects that should not
            be shared between processes. Defaults to None.
        process_args_len: Length of ``process_args``. Used for progress bar
            and worker pool sizing. Defaults to 1.

    Returns:
        List of results from the function executions, in the original
        submission order.

    Note:
        This function is not meant to be used directly. Use
        ``multiprocess_loop()`` for CPU-bound tasks or ``multithread_loop()``
        for I/O-bound tasks instead.
    """
    from winiutils.core.iterating.concurrent.multiprocessing import (  # noqa: PLC0415  # avoid circular import
        get_spwan_pool,
    )
    from winiutils.core.iterating.concurrent.multithreading import (  # noqa: PLC0415  # avoid circular import
        imap_unordered,
    )

    process_args_len = get_len_with_default(process_args, process_args_len)
    process_args = generate_process_args(
        process_function=process_function,
        process_args=process_args,
        process_args_static=process_args_static,
        deepcopy_static_args=deepcopy_static_args,
    )
    max_workers = find_max_pools(threads=threading, process_args_len=process_args_len)
    pool_executor = (
        ThreadPoolExecutor(max_workers=max_workers)
        if threading
        else get_spwan_pool(processes=max_workers)
    )
    with pool_executor as pool:
        map_func: Callable[[Callable[..., Any], Iterable[Any]], Any]

        if process_args_len == 1:
            map_func = map
        elif threading:
            pool = cast("ThreadPoolExecutor", pool)
            map_func = partial(imap_unordered, pool)
        else:
            pool = cast("Pool", pool)
            map_func = pool.imap_unordered

        results = map_func(get_order_and_func_result, process_args)

        return get_multiprocess_results_with_tqdm(
            results=results,
            process_func=process_function,
            process_args_len=process_args_len,
            threads=threading,
        )
find_max_pools(*, threads, process_args_len=None)

Determine optimal number of workers for parallel execution.

Calculates the maximum number of worker processes or threads based on system resources, currently active tasks, and the number of items to process.

Parameters:

Name Type Description Default
threads bool

Whether to use threading (True) or multiprocessing (False). Threading allows up to 4x CPU count, while multiprocessing is limited to CPU count.

required
process_args_len int | None

Number of items to process in parallel. If provided, the result will not exceed this value.

None

Returns:

Type Description
int

Maximum number of worker processes or threads to use. Always at

int

least 1.

Note

For threading, the maximum is cpu_count * 4 minus active threads. For multiprocessing, the maximum is cpu_count minus active child processes.

Source code in src/winiutils/core/iterating/concurrent/concurrent.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def find_max_pools(
    *,
    threads: bool,
    process_args_len: int | None = None,
) -> int:
    """Determine optimal number of workers for parallel execution.

    Calculates the maximum number of worker processes or threads based on
    system resources, currently active tasks, and the number of items to
    process.

    Args:
        threads: Whether to use threading (True) or multiprocessing (False).
            Threading allows up to 4x CPU count, while multiprocessing is
            limited to CPU count.
        process_args_len: Number of items to process in parallel. If
            provided, the result will not exceed this value.

    Returns:
        Maximum number of worker processes or threads to use. Always at
        least 1.

    Note:
        For threading, the maximum is ``cpu_count * 4`` minus active threads.
        For multiprocessing, the maximum is ``cpu_count`` minus active
        child processes.
    """
    # use tee to find length of process_args
    cpu_count = os.cpu_count() or 1
    if threads:
        active_tasks = threading.active_count()
        max_tasks = cpu_count * 4
    else:
        active_tasks = len(multiprocessing.active_children())
        max_tasks = cpu_count

    available_tasks = max_tasks - active_tasks
    max_pools = (
        min(available_tasks, process_args_len) if process_args_len else available_tasks
    )
    max_pools = max(max_pools, 1)

    logger.info(
        "Multi%s with max_pools: %s",
        "threading" if threads else "processing",
        max_pools,
    )

    return max_pools
generate_process_args(*, process_function, process_args, process_args_static=None, deepcopy_static_args=None)

Prepare arguments for multiprocessing or multithreading execution.

Converts input arguments into a format suitable for parallel processing, organizing them for efficient unpacking during execution.

The function performs the following transformations
  1. Prepends the process function and order index to each argument tuple
  2. Appends static arguments to each call
  3. Deep-copies specified arguments for each call (for mutable objects)

Parameters:

Name Type Description Default
process_function Callable[..., Any]

The function to be executed in parallel.

required
process_args Iterable[Iterable[Any]]

Iterable of argument lists for each parallel call. Each inner iterable contains the arguments for one function call.

required
process_args_static Iterable[Any] | None

Optional constant arguments to append to each call. These are shared across all calls without copying.

None
deepcopy_static_args Iterable[Any] | None

Optional arguments that should be deep-copied for each process. Use this for mutable objects that should not be shared between processes.

None

Yields:

Type Description
Any

Tuples formatted as: (process_function, order_index, *args,

...

static_args, deepcopied_args)

Example

def add(a, b, c): ... return a + b + c args = generate_process_args( ... process_function=add, ... process_args=[[1], [2]], ... process_args_static=[10], ... ) list(args) [(add, 0, 1, 10), (add, 1, 2, 10)]

Source code in src/winiutils/core/iterating/concurrent/concurrent.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def generate_process_args(
    *,
    process_function: Callable[..., Any],
    process_args: Iterable[Iterable[Any]],
    process_args_static: Iterable[Any] | None = None,
    deepcopy_static_args: Iterable[Any] | None = None,
) -> Generator[tuple[Any, ...], None, None]:
    """Prepare arguments for multiprocessing or multithreading execution.

    Converts input arguments into a format suitable for parallel processing,
    organizing them for efficient unpacking during execution.

    The function performs the following transformations:
        1. Prepends the process function and order index to each argument tuple
        2. Appends static arguments to each call
        3. Deep-copies specified arguments for each call (for mutable objects)

    Args:
        process_function: The function to be executed in parallel.
        process_args: Iterable of argument lists for each parallel call.
            Each inner iterable contains the arguments for one function call.
        process_args_static: Optional constant arguments to append to each
            call. These are shared across all calls without copying.
        deepcopy_static_args: Optional arguments that should be deep-copied
            for each process. Use this for mutable objects that should not
            be shared between processes.

    Yields:
        Tuples formatted as: (process_function, order_index, *args,
        *static_args, *deepcopied_args)

    Example:
        >>> def add(a, b, c):
        ...     return a + b + c
        >>> args = generate_process_args(
        ...     process_function=add,
        ...     process_args=[[1], [2]],
        ...     process_args_static=[10],
        ... )
        >>> list(args)
        [(add, 0, 1, 10), (add, 1, 2, 10)]
    """
    process_args_static = (
        () if process_args_static is None else tuple(process_args_static)
    )
    deepcopy_static_args = (
        () if deepcopy_static_args is None else tuple(deepcopy_static_args)
    )
    for order, process_arg in enumerate(process_args):
        yield (
            process_function,
            order,
            *process_arg,
            *process_args_static,
            *(
                deepcopy(deepcopy_static_arg)
                for deepcopy_static_arg in deepcopy_static_args
            ),
        )
get_multiprocess_results_with_tqdm(results, process_func, process_args_len, *, threads)

Collect parallel execution results with progress tracking.

Processes results from parallel execution with a tqdm progress bar and ensures they are returned in the original submission order.

Parameters:

Name Type Description Default
results Iterable[Any]

Iterable of (order_index, result) tuples from parallel execution.

required
process_func Callable[..., Any]

The function that was executed in parallel. Used for the progress bar description.

required
process_args_len int

Total number of items being processed. Used for the progress bar total.

required
threads bool

Whether threading (True) or multiprocessing (False) was used. Affects the progress bar description.

required

Returns:

Type Description
list[Any]

List of results from parallel execution, sorted by original

list[Any]

submission order.

Source code in src/winiutils/core/iterating/concurrent/concurrent.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def get_multiprocess_results_with_tqdm(
    results: Iterable[Any],
    process_func: Callable[..., Any],
    process_args_len: int,
    *,
    threads: bool,
) -> list[Any]:
    """Collect parallel execution results with progress tracking.

    Processes results from parallel execution with a tqdm progress bar and
    ensures they are returned in the original submission order.

    Args:
        results: Iterable of (order_index, result) tuples from parallel
            execution.
        process_func: The function that was executed in parallel. Used for
            the progress bar description.
        process_args_len: Total number of items being processed. Used for
            the progress bar total.
        threads: Whether threading (True) or multiprocessing (False) was
            used. Affects the progress bar description.

    Returns:
        List of results from parallel execution, sorted by original
        submission order.
    """
    results = tqdm(
        results,
        total=process_args_len,
        desc=f"Multi{'threading' if threads else 'processing'} {process_func}",
        unit=f" {'threads' if threads else 'processes'}",
    )
    results_list = list(results)
    # results list is a tuple of (order, result),
    # so we need to sort it by order to get the original order
    results_list = sorted(results_list, key=lambda x: x[0])
    # now extract the results from the tuple
    return [result[1] for result in results_list]
get_order_and_func_result(func_order_args)

Execute a function and return its result with order index.

Helper function used with imap_unordered to execute a function with arguments unpacking while preserving the original order of results.

Parameters:

Name Type Description Default
func_order_args tuple[Any, ...]

Tuple containing: - The function to be executed - The order index (int) - The arguments for the function (unpacked)

required

Returns:

Type Description
int

A tuple of (order_index, result) where order_index is the original

Any

position and result is the function's return value.

Source code in src/winiutils/core/iterating/concurrent/concurrent.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def get_order_and_func_result(
    func_order_args: tuple[Any, ...],
) -> tuple[int, Any]:
    """Execute a function and return its result with order index.

    Helper function used with ``imap_unordered`` to execute a function with
    arguments unpacking while preserving the original order of results.

    Args:
        func_order_args: Tuple containing:
            - The function to be executed
            - The order index (int)
            - The arguments for the function (unpacked)

    Returns:
        A tuple of (order_index, result) where order_index is the original
        position and result is the function's return value.
    """
    function, order, *args = func_order_args
    return order, function(*args)
multiprocessing

Multiprocessing utilities for CPU-bound parallel execution.

This module provides functions for parallel processing using Python's multiprocessing module. It includes utilities for handling timeouts, managing process pools, and organizing parallel execution of CPU-bound functions.

Use multiprocessing for CPU-bound tasks that benefit from true parallelism by bypassing Python's Global Interpreter Lock (GIL).

Example

from winiutils.core.iterating.concurrent.multiprocessing import ( ... multiprocess_loop, ... ) def square(x): ... return x * x results = multiprocess_loop( ... process_function=square, ... process_args=[[1], [2], [3]], ... process_args_len=3, ... ) results [1, 4, 9]

cancel_on_timeout(seconds, message)

Create a decorator that cancels function execution on timeout.

Creates a wrapper that executes the decorated function in a separate process and terminates it if execution time exceeds the specified timeout.

Parameters:

Name Type Description Default
seconds float

Maximum execution time in seconds before timeout.

required
message str

Error message to include in the warning log when timeout occurs.

required

Returns:

Type Description
Callable[..., Any]

A decorator function that wraps the target function with timeout

Callable[..., Any]

functionality.

Raises:

Type Description
TimeoutError

When function execution exceeds the timeout.

Warning

Only works with functions that are pickle-able. This means it may not work as a decorator on methods or closures. Instead, use it as a wrapper function::

my_func = cancel_on_timeout(
    seconds=2,
    message="Test timeout",
)(my_func)
Example

def slow_function(): ... import time ... time.sleep(10) ... return "done" timed_func = cancel_on_timeout( ... seconds=1, ... message="Function took too long", ... )(slow_function) timed_func() # Raises TimeoutError after 1 second

Source code in src/winiutils/core/iterating/concurrent/multiprocessing.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def cancel_on_timeout(seconds: float, message: str) -> Callable[..., Any]:
    """Create a decorator that cancels function execution on timeout.

    Creates a wrapper that executes the decorated function in a separate
    process and terminates it if execution time exceeds the specified
    timeout.

    Args:
        seconds: Maximum execution time in seconds before timeout.
        message: Error message to include in the warning log when timeout
            occurs.

    Returns:
        A decorator function that wraps the target function with timeout
        functionality.

    Raises:
        multiprocessing.TimeoutError: When function execution exceeds the
            timeout.

    Warning:
        Only works with functions that are pickle-able. This means it may
        not work as a decorator on methods or closures. Instead, use it as
        a wrapper function::

            my_func = cancel_on_timeout(
                seconds=2,
                message="Test timeout",
            )(my_func)

    Example:
        >>> def slow_function():
        ...     import time
        ...     time.sleep(10)
        ...     return "done"
        >>> timed_func = cancel_on_timeout(
        ...     seconds=1,
        ...     message="Function took too long",
        ... )(slow_function)
        >>> timed_func()  # Raises TimeoutError after 1 second
    """

    def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
        @wraps(func)
        def wrapper(*args: object, **kwargs: object) -> object:
            spawn_pool = get_spwan_pool(processes=1)
            with spawn_pool as pool:
                async_result = pool.apply_async(func, args, kwargs)
                try:
                    return async_result.get(timeout=seconds)
                except multiprocessing.TimeoutError:
                    logger.warning(
                        "%s -> Execution exceeded %s seconds: %s",
                        func,
                        seconds,
                        message,
                    )
                    raise
                finally:
                    pool.terminate()  # Ensure the worker process is killed
                    pool.join()  # Wait for cleanup

        return wrapper

    return decorator
get_spwan_pool(*args, **kwargs)

Create a multiprocessing pool with the spawn context.

Uses the 'spawn' start method which creates a fresh Python interpreter process. This is safer than 'fork' as it avoids issues with inherited file descriptors and locks.

Parameters:

Name Type Description Default
*args Any

Positional arguments passed to Pool constructor.

()
**kwargs Any

Keyword arguments passed to Pool constructor.

{}

Returns:

Type Description
Pool

A multiprocessing Pool configured with the spawn context.

Example

pool = get_spwan_pool(processes=4) with pool: ... results = pool.map(square, [1, 2, 3])

Source code in src/winiutils/core/iterating/concurrent/multiprocessing.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def get_spwan_pool(*args: Any, **kwargs: Any) -> Pool:
    """Create a multiprocessing pool with the spawn context.

    Uses the 'spawn' start method which creates a fresh Python interpreter
    process. This is safer than 'fork' as it avoids issues with inherited
    file descriptors and locks.

    Args:
        *args: Positional arguments passed to ``Pool`` constructor.
        **kwargs: Keyword arguments passed to ``Pool`` constructor.

    Returns:
        A multiprocessing Pool configured with the spawn context.

    Example:
        >>> pool = get_spwan_pool(processes=4)
        >>> with pool:
        ...     results = pool.map(square, [1, 2, 3])
    """
    return multiprocessing.get_context("spawn").Pool(*args, **kwargs)
multiprocess_loop(process_function, process_args, process_args_static=None, deepcopy_static_args=None, process_args_len=1)

Execute a function in parallel using multiprocessing Pool.

Executes the given function with the provided arguments in parallel using multiprocessing Pool, which is suitable for CPU-bound tasks.

Parameters:

Name Type Description Default
process_function Callable[..., Any]

The function to execute in parallel. Must be pickle-able.

required
process_args Iterable[Iterable[Any]]

Iterable of argument lists for each parallel call. Each inner iterable contains the arguments for one function call. Example: [(1, 2), (3, 4), (5, 6)]

required
process_args_static Iterable[Any] | None

Optional constant arguments to append to each call. These are shared across all calls without copying. Defaults to None.

None
deepcopy_static_args Iterable[Any] | None

Optional arguments that should be deep-copied for each process. Use this for mutable objects that should not be shared between processes. Defaults to None.

None
process_args_len int

Length of process_args. Used for progress bar and worker pool sizing. Defaults to 1.

1

Returns:

Type Description
list[Any]

List of results from the function executions, in the original

list[Any]

submission order.

Note
  • Use multiprocessing for CPU-bound tasks as it bypasses Python's GIL by creating separate processes.
  • Multiprocessing is not safe for mutable objects; use deepcopy_static_args for mutable data.
  • If ConnectionErrors occur during debugging, try reducing the number of processes.
  • All functions and arguments must be pickle-able.
Example

def add(a, b, c): ... return a + b + c results = multiprocess_loop( ... process_function=add, ... process_args=[[1, 2], [3, 4]], ... process_args_static=[10], ... process_args_len=2, ... ) results [13, 17]

Source code in src/winiutils/core/iterating/concurrent/multiprocessing.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def multiprocess_loop(
    process_function: Callable[..., Any],
    process_args: Iterable[Iterable[Any]],
    process_args_static: Iterable[Any] | None = None,
    deepcopy_static_args: Iterable[Any] | None = None,
    process_args_len: int = 1,
) -> list[Any]:
    """Execute a function in parallel using multiprocessing Pool.

    Executes the given function with the provided arguments in parallel
    using multiprocessing Pool, which is suitable for CPU-bound tasks.

    Args:
        process_function: The function to execute in parallel. Must be
            pickle-able.
        process_args: Iterable of argument lists for each parallel call.
            Each inner iterable contains the arguments for one function
            call. Example: ``[(1, 2), (3, 4), (5, 6)]``
        process_args_static: Optional constant arguments to append to each
            call. These are shared across all calls without copying.
            Defaults to None.
        deepcopy_static_args: Optional arguments that should be deep-copied
            for each process. Use this for mutable objects that should not
            be shared between processes. Defaults to None.
        process_args_len: Length of ``process_args``. Used for progress bar
            and worker pool sizing. Defaults to 1.

    Returns:
        List of results from the function executions, in the original
        submission order.

    Note:
        - Use multiprocessing for CPU-bound tasks as it bypasses Python's
          GIL by creating separate processes.
        - Multiprocessing is not safe for mutable objects; use
          ``deepcopy_static_args`` for mutable data.
        - If ConnectionErrors occur during debugging, try reducing the
          number of processes.
        - All functions and arguments must be pickle-able.

    Example:
        >>> def add(a, b, c):
        ...     return a + b + c
        >>> results = multiprocess_loop(
        ...     process_function=add,
        ...     process_args=[[1, 2], [3, 4]],
        ...     process_args_static=[10],
        ...     process_args_len=2,
        ... )
        >>> results
        [13, 17]
    """
    return concurrent_loop(
        threading=False,
        process_function=process_function,
        process_args=process_args,
        process_args_static=process_args_static,
        deepcopy_static_args=deepcopy_static_args,
        process_args_len=process_args_len,
    )
multithreading

Multithreading utilities for I/O-bound parallel execution.

This module provides functions for parallel processing using thread pools. It includes utilities for handling thread pools, managing futures, and organizing parallel execution of I/O-bound tasks.

Use multithreading for I/O-bound tasks such as network requests, file operations, or database queries where threads spend most of their time waiting for external resources.

Example

from winiutils.core.iterating.concurrent.multithreading import ( ... multithread_loop, ... ) def fetch_url(url): ... import requests ... return requests.get(url).status_code results = multithread_loop( ... process_function=fetch_url, ... process_args=[["https://example.com"], ["https://google.com"]], ... process_args_len=2, ... )

get_future_results_as_completed(futures)

Yield future results as they complete.

Yields results from futures in the order they complete, not in the order they were submitted. This allows processing results as soon as they're available.

Parameters:

Name Type Description Default
futures Iterable[Future[Any]]

Iterable of Future objects to get results from.

required

Yields:

Type Description
Any

The result of each completed future.

Example

with ThreadPoolExecutor() as executor: ... futures = [executor.submit(square, i) for i in range(3)] ... for result in get_future_results_as_completed(futures): ... print(result)

Source code in src/winiutils/core/iterating/concurrent/multithreading.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def get_future_results_as_completed(
    futures: Iterable[Future[Any]],
) -> Generator[Any, None, None]:
    """Yield future results as they complete.

    Yields results from futures in the order they complete, not in the
    order they were submitted. This allows processing results as soon as
    they're available.

    Args:
        futures: Iterable of Future objects to get results from.

    Yields:
        The result of each completed future.

    Example:
        >>> with ThreadPoolExecutor() as executor:
        ...     futures = [executor.submit(square, i) for i in range(3)]
        ...     for result in get_future_results_as_completed(futures):
        ...         print(result)
    """
    for future in as_completed(futures):
        yield future.result()
imap_unordered(executor, func, iterable)

Apply a function to each item in an iterable in parallel.

Similar to multiprocessing.Pool.imap_unordered(), this function submits all items to the executor and yields results as they complete.

Parameters:

Name Type Description Default
executor ThreadPoolExecutor

ThreadPoolExecutor to use for parallel execution.

required
func Callable[..., Any]

Function to apply to each item in the iterable.

required
iterable Iterable[Any]

Iterable of items to apply the function to.

required

Yields:

Type Description
Any

Results of applying the function to each item, in completion order

Any

(not submission order).

Example

with ThreadPoolExecutor(max_workers=4) as executor: ... for result in imap_unordered(executor, square, [1, 2, 3]): ... print(result)

Source code in src/winiutils/core/iterating/concurrent/multithreading.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def imap_unordered(
    executor: ThreadPoolExecutor,
    func: Callable[..., Any],
    iterable: Iterable[Any],
) -> Generator[Any, None, None]:
    """Apply a function to each item in an iterable in parallel.

    Similar to ``multiprocessing.Pool.imap_unordered()``, this function
    submits all items to the executor and yields results as they complete.

    Args:
        executor: ThreadPoolExecutor to use for parallel execution.
        func: Function to apply to each item in the iterable.
        iterable: Iterable of items to apply the function to.

    Yields:
        Results of applying the function to each item, in completion order
        (not submission order).

    Example:
        >>> with ThreadPoolExecutor(max_workers=4) as executor:
        ...     for result in imap_unordered(executor, square, [1, 2, 3]):
        ...         print(result)
    """
    results = [executor.submit(func, item) for item in iterable]
    yield from get_future_results_as_completed(results)
multithread_loop(process_function, process_args, process_args_static=None, process_args_len=1)

Execute a function in parallel using ThreadPoolExecutor.

Executes the given function with the provided arguments in parallel using ThreadPoolExecutor, which is suitable for I/O-bound tasks.

Parameters:

Name Type Description Default
process_function Callable[..., Any]

The function to execute in parallel.

required
process_args Iterable[Iterable[Any]]

Iterable of argument lists for each parallel call. Each inner iterable contains the arguments for one function call. Example: [["url1"], ["url2"], ["url3"]]

required
process_args_static Iterable[Any] | None

Optional constant arguments to append to each call. These are shared across all calls. Defaults to None.

None
process_args_len int

Length of process_args. Used for progress bar and worker pool sizing. Defaults to 1.

1

Returns:

Type Description
list[Any]

List of results from the function executions, in the original

list[Any]

submission order.

Note

Use ThreadPoolExecutor for I/O-bound tasks (network requests, file I/O, database queries). For CPU-bound tasks, use multiprocess_loop() instead.

Example

def download(url, timeout): ... import requests ... return requests.get(url, timeout=timeout).text results = multithread_loop( ... process_function=download, ... process_args=[["https://example.com"], ["https://google.com"]], ... process_args_static=[30], # 30 second timeout for all ... process_args_len=2, ... )

Source code in src/winiutils/core/iterating/concurrent/multithreading.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def multithread_loop(
    process_function: Callable[..., Any],
    process_args: Iterable[Iterable[Any]],
    process_args_static: Iterable[Any] | None = None,
    process_args_len: int = 1,
) -> list[Any]:
    """Execute a function in parallel using ThreadPoolExecutor.

    Executes the given function with the provided arguments in parallel
    using ThreadPoolExecutor, which is suitable for I/O-bound tasks.

    Args:
        process_function: The function to execute in parallel.
        process_args: Iterable of argument lists for each parallel call.
            Each inner iterable contains the arguments for one function
            call. Example: ``[["url1"], ["url2"], ["url3"]]``
        process_args_static: Optional constant arguments to append to each
            call. These are shared across all calls. Defaults to None.
        process_args_len: Length of ``process_args``. Used for progress bar
            and worker pool sizing. Defaults to 1.

    Returns:
        List of results from the function executions, in the original
        submission order.

    Note:
        Use ThreadPoolExecutor for I/O-bound tasks (network requests, file
        I/O, database queries). For CPU-bound tasks, use
        ``multiprocess_loop()`` instead.

    Example:
        >>> def download(url, timeout):
        ...     import requests
        ...     return requests.get(url, timeout=timeout).text
        >>> results = multithread_loop(
        ...     process_function=download,
        ...     process_args=[["https://example.com"], ["https://google.com"]],
        ...     process_args_static=[30],  # 30 second timeout for all
        ...     process_args_len=2,
        ... )
    """
    return concurrent_loop(
        threading=True,
        process_function=process_function,
        process_args=process_args,
        process_args_static=process_args_static,
        process_args_len=process_args_len,
    )

iterate

Iterating utilities for handling iterables.

This module provides utility functions for working with iterables, including getting the length of an iterable with a default value. These utilities help with iterable operations and manipulations.

get_len_with_default(iterable, default=None)

Get the length of an iterable, falling back to a default value.

Attempts to get the length of an iterable using len(). If the iterable doesn't support len() (e.g., generators), returns the provided default value instead.

Parameters:

Name Type Description Default
iterable Iterable[Any]

The iterable to get the length of.

required
default int | None

Default value to return if the iterable doesn't support len(). If None and the iterable doesn't support len(), a TypeError is raised.

None

Returns:

Type Description
int

The length of the iterable, or the default value if the iterable

int

doesn't support len().

Raises:

Type Description
TypeError

If the iterable doesn't support len() and no default value is provided.

Example

get_len_with_default([1, 2, 3]) 3 get_len_with_default((x for x in range(10)), default=10) 10

Source code in src/winiutils/core/iterating/iterate.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_len_with_default(iterable: Iterable[Any], default: int | None = None) -> int:
    """Get the length of an iterable, falling back to a default value.

    Attempts to get the length of an iterable using ``len()``. If the
    iterable doesn't support ``len()`` (e.g., generators), returns the
    provided default value instead.

    Args:
        iterable: The iterable to get the length of.
        default: Default value to return if the iterable doesn't support
            ``len()``. If None and the iterable doesn't support ``len()``,
            a TypeError is raised.

    Returns:
        The length of the iterable, or the default value if the iterable
        doesn't support ``len()``.

    Raises:
        TypeError: If the iterable doesn't support ``len()`` and no default
            value is provided.

    Example:
        >>> get_len_with_default([1, 2, 3])
        3
        >>> get_len_with_default((x for x in range(10)), default=10)
        10
    """
    try:
        return len(iterable)  # type: ignore[arg-type]  # ty:ignore[invalid-argument-type]
    except TypeError as e:
        if default is None:
            msg = "Can't get length of iterable and no default value provided"
            raise TypeError(msg) from e
        return default

oop

Object-oriented programming utilities package.

This package provides utilities for OOP patterns and class behavior modification:

Modules:

Name Description
mixins

Metaclasses and mixins for automatic method logging and instrumentation.

mixins

Mixins and metaclasses package.

This package provides metaclasses and mixins for class behavior modification:

Modules:

Name Description
meta

Metaclasses for automatic method logging and instrumentation.

mixin

Mixin classes that provide composable behavior extensions.

meta

Metaclass utilities for class behavior modification and enforcement.

This module provides metaclasses that can be used to modify class behavior at creation time. These metaclasses can be used individually or combined to create classes with enhanced capabilities and stricter implementation requirements.

Example

from winiutils.core.oop.mixins.meta import ABCLoggingMeta class MyClass(metaclass=ABCLoggingMeta): ... def my_method(self, x): ... return x * 2 obj = MyClass() obj.my_method(5) # Logs: "MyClass - Calling my_method with ..." 10

ABCLoggingMeta

Bases: ABCMeta

Metaclass that automatically adds logging to class methods.

Wraps non-magic methods with a logging decorator that tracks method calls, arguments, execution time, and return values. Includes rate limiting to prevent log flooding.

This metaclass extends ABCMeta, so classes using it can also define abstract methods.

Example

class Calculator(metaclass=ABCLoggingMeta): ... def add(self, a, b): ... return a + b calc = Calculator() calc.add(2, 3) # Logs method call and result 5

Note
  • Magic methods (__init__, __str__, etc.) are not logged.
  • Properties are not logged.
  • Logging is rate-limited to once per second per method to prevent log flooding.
Source code in src/winiutils/core/oop/mixins/meta.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
class ABCLoggingMeta(ABCMeta):
    """Metaclass that automatically adds logging to class methods.

    Wraps non-magic methods with a logging decorator that tracks method
    calls, arguments, execution time, and return values. Includes rate
    limiting to prevent log flooding.

    This metaclass extends ``ABCMeta``, so classes using it can also define
    abstract methods.

    Attributes:
        Inherits all attributes from ``ABCMeta``.

    Example:
        >>> class Calculator(metaclass=ABCLoggingMeta):
        ...     def add(self, a, b):
        ...         return a + b
        >>> calc = Calculator()
        >>> calc.add(2, 3)  # Logs method call and result
        5

    Note:
        - Magic methods (``__init__``, ``__str__``, etc.) are not logged.
        - Properties are not logged.
        - Logging is rate-limited to once per second per method to prevent
          log flooding.
    """

    def __new__(
        mcs: type["ABCLoggingMeta"],
        name: str,
        bases: tuple[type, ...],
        dct: dict[str, Any],
    ) -> "ABCLoggingMeta":
        """Create a new class with logging-wrapped methods.

        Intercepts class creation to wrap all non-magic methods with logging
        functionality. Handles regular methods, class methods, and static
        methods.

        Args:
            mcs: The metaclass instance.
            name: The name of the class being created.
            bases: The base classes of the class being created.
            dct: The attribute dictionary of the class being created.

        Returns:
            A new class with logging functionality added to its methods.
        """
        # Wrap all callables of the class with a logging wrapper

        for attr_name, attr_value in dct.items():
            if mcs.is_loggable_method(attr_value):
                if isinstance(attr_value, classmethod):
                    wrapped_method = mcs.wrap_with_logging(
                        func=attr_value.__func__, class_name=name, call_times={}
                    )
                    dct[attr_name] = classmethod(wrapped_method)
                elif isinstance(attr_value, staticmethod):
                    wrapped_method = mcs.wrap_with_logging(
                        func=attr_value.__func__, class_name=name, call_times={}
                    )
                    dct[attr_name] = staticmethod(wrapped_method)
                else:
                    dct[attr_name] = mcs.wrap_with_logging(
                        func=attr_value, class_name=name, call_times={}
                    )

        return super().__new__(mcs, name, bases, dct)

    @staticmethod
    def is_loggable_method(method: Callable[..., Any]) -> bool:
        """Determine if a method should have logging applied.

        Checks whether a method is a valid candidate for logging. Methods
        are logged if they are callable, have a name, and are not magic
        methods (those starting with ``__``).

        Args:
            method: The method to check.

        Returns:
            True if the method should be wrapped with logging, False
            otherwise.

        Note:
            Properties are not logged as they are not callable in the
            traditional sense and cause issues with the wrapping mechanism.
        """
        return (
            is_func(method)  # must be a method-like attribute
            and not getattr(method, "__name__", "__").startswith(
                "__"
            )  # must not be a magic method
        )

    @staticmethod
    def wrap_with_logging(
        func: Callable[..., Any],
        class_name: str,
        call_times: dict[str, float],
    ) -> Callable[..., Any]:
        """Wrap a function with logging functionality.

        Creates a wrapper that logs method calls, arguments, execution time,
        and return values. Includes rate limiting to prevent excessive
        logging (once per second per method).

        Args:
            func: The function to wrap with logging.
            class_name: The name of the class containing the function. Used
                in log messages.
            call_times: Dictionary to track when methods were last called.
                Used for rate limiting. This dictionary is mutated by the
                wrapper.

        Returns:
            A wrapped function with logging capabilities.

        Note:
            Arguments and return values are truncated to 20 characters in
            log messages to prevent excessively long log lines.
        """
        time_time = time.time  # Cache the time.time function for performance

        @wraps(func)
        def wrapper(*args: object, **kwargs: object) -> object:
            # call_times as a dictionary to store the call times of the function
            # we only log if the time since the last call is greater than the threshold
            # this is to avoid spamming the logs

            func_name = func.__name__  # ty:ignore[unresolved-attribute]

            threshold = 1

            last_call_time = call_times.get(func_name, 0)

            current_time = time_time()

            do_logging = (current_time - last_call_time) > threshold

            max_log_length = 20

            if do_logging:
                args_str = value_to_truncated_string(
                    value=args, max_length=max_log_length
                )

                kwargs_str = value_to_truncated_string(
                    value=kwargs, max_length=max_log_length
                )

                logger.info(
                    "%s - Calling %s with %s and %s",
                    class_name,
                    func_name,
                    args_str,
                    kwargs_str,
                )

            # Execute the function and return the result

            result = func(*args, **kwargs)

            if do_logging:
                duration = time_time() - current_time

                result_str = value_to_truncated_string(
                    value=result, max_length=max_log_length
                )

                logger.info(
                    "%s - %s finished with %s seconds -> returning %s",
                    class_name,
                    func_name,
                    duration,
                    result_str,
                )

            # save the call time for the next call

            call_times[func_name] = current_time

            return result

        return wrapper
__new__(mcs, name, bases, dct)

Create a new class with logging-wrapped methods.

Intercepts class creation to wrap all non-magic methods with logging functionality. Handles regular methods, class methods, and static methods.

Parameters:

Name Type Description Default
mcs type[ABCLoggingMeta]

The metaclass instance.

required
name str

The name of the class being created.

required
bases tuple[type, ...]

The base classes of the class being created.

required
dct dict[str, Any]

The attribute dictionary of the class being created.

required

Returns:

Type Description
ABCLoggingMeta

A new class with logging functionality added to its methods.

Source code in src/winiutils/core/oop/mixins/meta.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def __new__(
    mcs: type["ABCLoggingMeta"],
    name: str,
    bases: tuple[type, ...],
    dct: dict[str, Any],
) -> "ABCLoggingMeta":
    """Create a new class with logging-wrapped methods.

    Intercepts class creation to wrap all non-magic methods with logging
    functionality. Handles regular methods, class methods, and static
    methods.

    Args:
        mcs: The metaclass instance.
        name: The name of the class being created.
        bases: The base classes of the class being created.
        dct: The attribute dictionary of the class being created.

    Returns:
        A new class with logging functionality added to its methods.
    """
    # Wrap all callables of the class with a logging wrapper

    for attr_name, attr_value in dct.items():
        if mcs.is_loggable_method(attr_value):
            if isinstance(attr_value, classmethod):
                wrapped_method = mcs.wrap_with_logging(
                    func=attr_value.__func__, class_name=name, call_times={}
                )
                dct[attr_name] = classmethod(wrapped_method)
            elif isinstance(attr_value, staticmethod):
                wrapped_method = mcs.wrap_with_logging(
                    func=attr_value.__func__, class_name=name, call_times={}
                )
                dct[attr_name] = staticmethod(wrapped_method)
            else:
                dct[attr_name] = mcs.wrap_with_logging(
                    func=attr_value, class_name=name, call_times={}
                )

    return super().__new__(mcs, name, bases, dct)
is_loggable_method(method) staticmethod

Determine if a method should have logging applied.

Checks whether a method is a valid candidate for logging. Methods are logged if they are callable, have a name, and are not magic methods (those starting with __).

Parameters:

Name Type Description Default
method Callable[..., Any]

The method to check.

required

Returns:

Type Description
bool

True if the method should be wrapped with logging, False

bool

otherwise.

Note

Properties are not logged as they are not callable in the traditional sense and cause issues with the wrapping mechanism.

Source code in src/winiutils/core/oop/mixins/meta.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@staticmethod
def is_loggable_method(method: Callable[..., Any]) -> bool:
    """Determine if a method should have logging applied.

    Checks whether a method is a valid candidate for logging. Methods
    are logged if they are callable, have a name, and are not magic
    methods (those starting with ``__``).

    Args:
        method: The method to check.

    Returns:
        True if the method should be wrapped with logging, False
        otherwise.

    Note:
        Properties are not logged as they are not callable in the
        traditional sense and cause issues with the wrapping mechanism.
    """
    return (
        is_func(method)  # must be a method-like attribute
        and not getattr(method, "__name__", "__").startswith(
            "__"
        )  # must not be a magic method
    )
wrap_with_logging(func, class_name, call_times) staticmethod

Wrap a function with logging functionality.

Creates a wrapper that logs method calls, arguments, execution time, and return values. Includes rate limiting to prevent excessive logging (once per second per method).

Parameters:

Name Type Description Default
func Callable[..., Any]

The function to wrap with logging.

required
class_name str

The name of the class containing the function. Used in log messages.

required
call_times dict[str, float]

Dictionary to track when methods were last called. Used for rate limiting. This dictionary is mutated by the wrapper.

required

Returns:

Type Description
Callable[..., Any]

A wrapped function with logging capabilities.

Note

Arguments and return values are truncated to 20 characters in log messages to prevent excessively long log lines.

Source code in src/winiutils/core/oop/mixins/meta.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@staticmethod
def wrap_with_logging(
    func: Callable[..., Any],
    class_name: str,
    call_times: dict[str, float],
) -> Callable[..., Any]:
    """Wrap a function with logging functionality.

    Creates a wrapper that logs method calls, arguments, execution time,
    and return values. Includes rate limiting to prevent excessive
    logging (once per second per method).

    Args:
        func: The function to wrap with logging.
        class_name: The name of the class containing the function. Used
            in log messages.
        call_times: Dictionary to track when methods were last called.
            Used for rate limiting. This dictionary is mutated by the
            wrapper.

    Returns:
        A wrapped function with logging capabilities.

    Note:
        Arguments and return values are truncated to 20 characters in
        log messages to prevent excessively long log lines.
    """
    time_time = time.time  # Cache the time.time function for performance

    @wraps(func)
    def wrapper(*args: object, **kwargs: object) -> object:
        # call_times as a dictionary to store the call times of the function
        # we only log if the time since the last call is greater than the threshold
        # this is to avoid spamming the logs

        func_name = func.__name__  # ty:ignore[unresolved-attribute]

        threshold = 1

        last_call_time = call_times.get(func_name, 0)

        current_time = time_time()

        do_logging = (current_time - last_call_time) > threshold

        max_log_length = 20

        if do_logging:
            args_str = value_to_truncated_string(
                value=args, max_length=max_log_length
            )

            kwargs_str = value_to_truncated_string(
                value=kwargs, max_length=max_log_length
            )

            logger.info(
                "%s - Calling %s with %s and %s",
                class_name,
                func_name,
                args_str,
                kwargs_str,
            )

        # Execute the function and return the result

        result = func(*args, **kwargs)

        if do_logging:
            duration = time_time() - current_time

            result_str = value_to_truncated_string(
                value=result, max_length=max_log_length
            )

            logger.info(
                "%s - %s finished with %s seconds -> returning %s",
                class_name,
                func_name,
                duration,
                result_str,
            )

        # save the call time for the next call

        call_times[func_name] = current_time

        return result

    return wrapper
mixin

Mixin utilities for class composition and behavior extension.

This module provides mixin classes that facilitate class composition through the mixin pattern. It includes utilities for automatic method logging with performance tracking.

These utilities help create robust class hierarchies with built-in logging capabilities without requiring explicit decorator usage.

Example

from winiutils.core.oop.mixins.mixin import ABCLoggingMixin class MyService(ABCLoggingMixin): ... def process(self, data): ... return data.upper() service = MyService() service.process("hello") # Logs method call automatically 'HELLO'

ABCLoggingMixin

Mixin class that provides automatic method logging.

This mixin can be used as a base class for any class that needs automatic method logging with performance tracking. All non-magic methods will be automatically wrapped with logging functionality.

The logging includes
  • Method name and class name
  • Arguments passed to the method (truncated)
  • Execution time
  • Return value (truncated)

Inheriting from this class is equivalent to using ABCLoggingMeta as the metaclass, but provides a cleaner inheritance syntax.

Example

class DataProcessor(ABCLoggingMixin): ... def transform(self, data): ... return [x * 2 for x in data] processor = DataProcessor() processor.transform([1, 2, 3]) [2, 4, 6]

Logs: "DataProcessor - Calling transform with ..."
Logs: "DataProcessor - transform finished with 0.001 seconds -> ..."
Note
  • Magic methods (__init__, __str__, etc.) are not logged.
  • Logging is rate-limited to once per second per method.
  • This class can be combined with abstract methods since it uses ABCLoggingMeta which extends ABCMeta.
Source code in src/winiutils/core/oop/mixins/mixin.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class ABCLoggingMixin(metaclass=ABCLoggingMeta):
    """Mixin class that provides automatic method logging.

    This mixin can be used as a base class for any class that needs
    automatic method logging with performance tracking. All non-magic
    methods will be automatically wrapped with logging functionality.

    The logging includes:
        - Method name and class name
        - Arguments passed to the method (truncated)
        - Execution time
        - Return value (truncated)

    Inheriting from this class is equivalent to using ``ABCLoggingMeta``
    as the metaclass, but provides a cleaner inheritance syntax.

    Example:
        >>> class DataProcessor(ABCLoggingMixin):
        ...     def transform(self, data):
        ...         return [x * 2 for x in data]
        >>> processor = DataProcessor()
        >>> processor.transform([1, 2, 3])
        [2, 4, 6]
        # Logs: "DataProcessor - Calling transform with ..."
        # Logs: "DataProcessor - transform finished with 0.001 seconds -> ..."

    Note:
        - Magic methods (``__init__``, ``__str__``, etc.) are not logged.
        - Logging is rate-limited to once per second per method.
        - This class can be combined with abstract methods since it uses
          ``ABCLoggingMeta`` which extends ``ABCMeta``.
    """

security

Security utilities package.

This package provides utilities for encryption and secure storage:

Modules:

Name Description
cryptography

AES-GCM encryption and decryption utilities.

keyring

OS keyring integration for secure key storage and retrieval.

cryptography

Cryptography utilities for secure data handling.

This module provides utility functions for working with cryptography, including encryption and decryption using AES-GCM (Galois/Counter Mode). AES-GCM provides both confidentiality and authenticity guarantees.

Example

from cryptography.hazmat.primitives.ciphers.aead import AESGCM from winiutils.core.security.cryptography import ( ... encrypt_with_aes_gcm, ... decrypt_with_aes_gcm, ... ) key = AESGCM.generate_key(bit_length=256) aes_gcm = AESGCM(key) encrypted = encrypt_with_aes_gcm(aes_gcm, b"secret message") decrypted = decrypt_with_aes_gcm(aes_gcm, encrypted) decrypted b'secret message'

IV_LEN = 12 module-attribute

int: Length of the initialization vector (IV) in bytes.

AES-GCM requires a 12-byte (96-bit) IV for optimal performance and security.

decrypt_with_aes_gcm(aes_gcm, data, aad=None)

Decrypt data that was encrypted with AES-GCM.

Extracts the IV from the beginning of the encrypted data and uses it to decrypt the ciphertext. Also verifies the authentication tag to ensure data integrity.

Parameters:

Name Type Description Default
aes_gcm AESGCM

An initialized AESGCM cipher instance with the same key used for encryption.

required
data bytes

The encrypted data with IV prepended (as returned by encrypt_with_aes_gcm).

required
aad bytes | None

Optional additional authenticated data. Must match the AAD used during encryption, or decryption will fail.

None

Returns:

Type Description
bytes

The decrypted plaintext data.

Raises:

Type Description
InvalidTag

If the authentication tag is invalid, indicating the data was tampered with or the wrong key/AAD was used.

Example

key = AESGCM.generate_key(bit_length=256) aes_gcm = AESGCM(key) encrypted = encrypt_with_aes_gcm(aes_gcm, b"secret") decrypt_with_aes_gcm(aes_gcm, encrypted) b'secret'

Source code in src/winiutils/core/security/cryptography.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def decrypt_with_aes_gcm(
    aes_gcm: AESGCM, data: bytes, aad: bytes | None = None
) -> bytes:
    """Decrypt data that was encrypted with AES-GCM.

    Extracts the IV from the beginning of the encrypted data and uses it
    to decrypt the ciphertext. Also verifies the authentication tag to
    ensure data integrity.

    Args:
        aes_gcm: An initialized AESGCM cipher instance with the same key
            used for encryption.
        data: The encrypted data with IV prepended (as returned by
            ``encrypt_with_aes_gcm``).
        aad: Optional additional authenticated data. Must match the AAD
            used during encryption, or decryption will fail.

    Returns:
        The decrypted plaintext data.

    Raises:
        cryptography.exceptions.InvalidTag: If the authentication tag is
            invalid, indicating the data was tampered with or the wrong
            key/AAD was used.

    Example:
        >>> key = AESGCM.generate_key(bit_length=256)
        >>> aes_gcm = AESGCM(key)
        >>> encrypted = encrypt_with_aes_gcm(aes_gcm, b"secret")
        >>> decrypt_with_aes_gcm(aes_gcm, encrypted)
        b'secret'
    """
    iv, encrypted = data[:IV_LEN], data[IV_LEN:]
    return aes_gcm.decrypt(iv, encrypted, aad)
encrypt_with_aes_gcm(aes_gcm, data, aad=None)

Encrypt data using AES-GCM with a random initialization vector.

Encrypts the provided data using AES-GCM and prepends a randomly generated 12-byte IV to the ciphertext. The IV is required for decryption.

Parameters:

Name Type Description Default
aes_gcm AESGCM

An initialized AESGCM cipher instance.

required
data bytes

The plaintext data to encrypt.

required
aad bytes | None

Optional additional authenticated data. This data is not encrypted but is authenticated, meaning any tampering will be detected during decryption.

None

Returns:

Type Description
bytes

The encrypted data with the IV prepended. Format: IV + ciphertext.

Example

key = AESGCM.generate_key(bit_length=256) aes_gcm = AESGCM(key) encrypted = encrypt_with_aes_gcm(aes_gcm, b"hello world") len(encrypted) > len(b"hello world") # IV + ciphertext + tag True

Note

A new random IV is generated for each encryption call. Never reuse an IV with the same key.

Source code in src/winiutils/core/security/cryptography.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def encrypt_with_aes_gcm(
    aes_gcm: AESGCM, data: bytes, aad: bytes | None = None
) -> bytes:
    """Encrypt data using AES-GCM with a random initialization vector.

    Encrypts the provided data using AES-GCM and prepends a randomly
    generated 12-byte IV to the ciphertext. The IV is required for
    decryption.

    Args:
        aes_gcm: An initialized AESGCM cipher instance.
        data: The plaintext data to encrypt.
        aad: Optional additional authenticated data. This data is not
            encrypted but is authenticated, meaning any tampering will
            be detected during decryption.

    Returns:
        The encrypted data with the IV prepended. Format: ``IV + ciphertext``.

    Example:
        >>> key = AESGCM.generate_key(bit_length=256)
        >>> aes_gcm = AESGCM(key)
        >>> encrypted = encrypt_with_aes_gcm(aes_gcm, b"hello world")
        >>> len(encrypted) > len(b"hello world")  # IV + ciphertext + tag
        True

    Note:
        A new random IV is generated for each encryption call. Never reuse
        an IV with the same key.
    """
    iv = os.urandom(IV_LEN)
    encrypted = aes_gcm.encrypt(iv, data, aad)
    return iv + encrypted

keyring

Keyring utilities for secure storage and retrieval of secrets.

This module provides utility functions for working with the OS keyring, including getting and creating cryptographic keys. Keys are stored securely in the system's credential manager (e.g., macOS Keychain, Windows Credential Manager, or Linux Secret Service).

When running in GitHub Actions, a plaintext keyring is used instead since the system keyring is not available.

Example

from winiutils.core.security.keyring import get_or_create_fernet fernet, key_bytes = get_or_create_fernet("my_app", "encryption_key") encrypted = fernet.encrypt(b"secret data") fernet.decrypt(encrypted) b'secret data'

get_key_as_str(service_name, username, key_class)

Retrieve a key from the keyring as a base64-encoded string.

Parameters:

Name Type Description Default
service_name str

The service name used for keyring storage.

required
username str

The username/key identifier within the service.

required
key_class Callable[[bytes], T]

The key class used to modify the service name.

required

Returns:

Type Description
str | None

The base64-encoded key string, or None if the key doesn't exist.

Source code in src/winiutils/core/security/keyring.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def get_key_as_str[T](
    service_name: str, username: str, key_class: Callable[[bytes], T]
) -> str | None:
    """Retrieve a key from the keyring as a base64-encoded string.

    Args:
        service_name: The service name used for keyring storage.
        username: The username/key identifier within the service.
        key_class: The key class used to modify the service name.

    Returns:
        The base64-encoded key string, or None if the key doesn't exist.
    """
    service_name = make_service_name(service_name, key_class)
    return keyring.get_password(service_name, username)
get_or_create_aes_gcm(service_name, username)

Get or create an AES-GCM encryption key from the keyring.

Retrieves an existing AES-GCM key from the system keyring, or generates a new 256-bit key if it doesn't exist. The key is stored in the keyring for future use.

Parameters:

Name Type Description Default
service_name str

The service name to use for keyring storage. This identifies your application.

required
username str

The username/key identifier within the service.

required

Returns:

Type Description
AESGCM

A tuple of (AESGCM instance, raw key bytes). The AESGCM instance

bytes

can be used with encrypt_with_aes_gcm and decrypt_with_aes_gcm.

Example

aes_gcm, key = get_or_create_aes_gcm("my_app", "aes_key") from winiutils.core.security.cryptography import encrypt_with_aes_gcm encrypted = encrypt_with_aes_gcm(aes_gcm, b"secret")

Source code in src/winiutils/core/security/keyring.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def get_or_create_aes_gcm(service_name: str, username: str) -> tuple[AESGCM, bytes]:
    """Get or create an AES-GCM encryption key from the keyring.

    Retrieves an existing AES-GCM key from the system keyring, or generates
    a new 256-bit key if it doesn't exist. The key is stored in the keyring
    for future use.

    Args:
        service_name: The service name to use for keyring storage. This
            identifies your application.
        username: The username/key identifier within the service.

    Returns:
        A tuple of (AESGCM instance, raw key bytes). The AESGCM instance
        can be used with ``encrypt_with_aes_gcm`` and ``decrypt_with_aes_gcm``.

    Example:
        >>> aes_gcm, key = get_or_create_aes_gcm("my_app", "aes_key")
        >>> from winiutils.core.security.cryptography import encrypt_with_aes_gcm
        >>> encrypted = encrypt_with_aes_gcm(aes_gcm, b"secret")
    """
    return get_or_create_key(
        service_name, username, AESGCM, lambda: AESGCM.generate_key(bit_length=256)
    )
get_or_create_fernet(service_name, username)

Get or create a Fernet symmetric encryption key from the keyring.

Retrieves an existing Fernet key from the system keyring, or generates a new one if it doesn't exist. The key is stored in the keyring for future use.

Parameters:

Name Type Description Default
service_name str

The service name to use for keyring storage. This identifies your application.

required
username str

The username/key identifier within the service.

required

Returns:

Type Description
Fernet

A tuple of (Fernet instance, raw key bytes). The Fernet instance

bytes

can be used directly for encryption/decryption.

Example

fernet, key = get_or_create_fernet("my_app", "main_key") encrypted = fernet.encrypt(b"hello") fernet.decrypt(encrypted) b'hello'

Source code in src/winiutils/core/security/keyring.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def get_or_create_fernet(service_name: str, username: str) -> tuple[Fernet, bytes]:
    """Get or create a Fernet symmetric encryption key from the keyring.

    Retrieves an existing Fernet key from the system keyring, or generates
    a new one if it doesn't exist. The key is stored in the keyring for
    future use.

    Args:
        service_name: The service name to use for keyring storage. This
            identifies your application.
        username: The username/key identifier within the service.

    Returns:
        A tuple of (Fernet instance, raw key bytes). The Fernet instance
        can be used directly for encryption/decryption.

    Example:
        >>> fernet, key = get_or_create_fernet("my_app", "main_key")
        >>> encrypted = fernet.encrypt(b"hello")
        >>> fernet.decrypt(encrypted)
        b'hello'
    """
    return get_or_create_key(service_name, username, Fernet, Fernet.generate_key)
get_or_create_key(service_name, username, key_class, generate_key_func)

Get or create a cryptographic key from the keyring.

Generic function that retrieves an existing key from the system keyring, or generates a new one using the provided generator function if it doesn't exist.

Parameters:

Name Type Description Default
service_name str

The service name to use for keyring storage.

required
username str

The username/key identifier within the service.

required
key_class Callable[[bytes], T]

A callable that takes raw key bytes and returns a cipher instance (e.g., Fernet, AESGCM).

required
generate_key_func Callable[..., bytes]

A callable that generates new raw key bytes.

required

Returns:

Type Description
tuple[T, bytes]

A tuple of (cipher instance, raw key bytes).

Note

Keys are stored in the keyring as base64-encoded strings. The service name is modified to include the key class name to allow storing different key types for the same service/username.

Example

from cryptography.fernet import Fernet cipher, key = get_or_create_key( ... "my_app", ... "custom_key", ... Fernet, ... Fernet.generate_key, ... )

Source code in src/winiutils/core/security/keyring.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def get_or_create_key[T](
    service_name: str,
    username: str,
    key_class: Callable[[bytes], T],
    generate_key_func: Callable[..., bytes],
) -> tuple[T, bytes]:
    """Get or create a cryptographic key from the keyring.

    Generic function that retrieves an existing key from the system keyring,
    or generates a new one using the provided generator function if it
    doesn't exist.

    Args:
        service_name: The service name to use for keyring storage.
        username: The username/key identifier within the service.
        key_class: A callable that takes raw key bytes and returns a cipher
            instance (e.g., ``Fernet``, ``AESGCM``).
        generate_key_func: A callable that generates new raw key bytes.

    Returns:
        A tuple of (cipher instance, raw key bytes).

    Note:
        Keys are stored in the keyring as base64-encoded strings. The
        service name is modified to include the key class name to allow
        storing different key types for the same service/username.

    Example:
        >>> from cryptography.fernet import Fernet
        >>> cipher, key = get_or_create_key(
        ...     "my_app",
        ...     "custom_key",
        ...     Fernet,
        ...     Fernet.generate_key,
        ... )
    """
    key = get_key_as_str(service_name, username, key_class)
    if key is None:
        binary_key = generate_key_func()
        key = b64encode(binary_key).decode("ascii")
        modified_service_name = make_service_name(service_name, key_class)
        keyring.set_password(modified_service_name, username, key)

    binary_key = b64decode(key)
    return key_class(binary_key), binary_key
make_service_name(service_name, key_class)

Create a unique service name by combining service name and key class.

This allows storing different key types (Fernet, AESGCM, etc.) for the same service and username combination.

Parameters:

Name Type Description Default
service_name str

The base service name.

required
key_class Callable[[bytes], T]

The key class whose name will be appended.

required

Returns:

Type Description
str

A modified service name in the format {service_name}_{class_name}.

Example

make_service_name("my_app", Fernet) 'my_app_Fernet'

Source code in src/winiutils/core/security/keyring.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def make_service_name[T](service_name: str, key_class: Callable[[bytes], T]) -> str:
    """Create a unique service name by combining service name and key class.

    This allows storing different key types (Fernet, AESGCM, etc.) for the
    same service and username combination.

    Args:
        service_name: The base service name.
        key_class: The key class whose name will be appended.

    Returns:
        A modified service name in the format ``{service_name}_{class_name}``.

    Example:
        >>> make_service_name("my_app", Fernet)
        'my_app_Fernet'
    """
    return f"{service_name}_{key_class.__name__}"  # ty:ignore[unresolved-attribute]

rig

init module.

tests

init module.

fixtures

init module.

fixtures

Fixtures for testing.

This module provides custom fixtures for pytest that can be pluued into tests across the entire test suite. All fixtures defined under the fixtures package are auto plugged in automatically by pyrig via the pytest_plugins mechanism.

keyring_cleanup()

Factory fixture to clean up keyring entries after test.

Usage

def test_something(keyring_cleanup): keyring_cleanup("service_name", "username") # ... test code that creates keyring entries ...

Source code in src/winiutils/rig/tests/fixtures/fixtures.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@pytest.fixture
def keyring_cleanup() -> Iterator[Callable[[str, str], None]]:
    """Factory fixture to clean up keyring entries after test.

    Usage:
        def test_something(keyring_cleanup):
            keyring_cleanup("service_name", "username")
            # ... test code that creates keyring entries ...
    """
    entries: list[tuple[str, str]] = []

    def register(service_name: str, username: str) -> None:
        entries.append((service_name, username))

    yield register

    for service_name, username in entries:
        keyring.delete_password(service_name, username)

tools

Tool wrappers for CLI tools used in development workflows.

Tools are subclasses of Tool providing methods that return Args objects for type-safe command construction and execution.

pyrigger

Override pyrig's Pyrigger to add custom dev dependencies.

Pyrigger

Bases: Pyrigger

Override pyrig's Pyrigger to add custom dev dependencies.

Source code in src/winiutils/rig/tools/pyrigger.py
 6
 7
 8
 9
10
11
class Pyrigger(BasePyrigger):
    """Override pyrig's Pyrigger to add custom dev dependencies."""

    def dev_dependencies(self) -> tuple[str, ...]:
        """Add custom dev dependencies to pyrig's default list."""
        return (*super().dev_dependencies(), "types-tqdm", "types-defusedxml")
dev_dependencies()

Add custom dev dependencies to pyrig's default list.

Source code in src/winiutils/rig/tools/pyrigger.py
 9
10
11
def dev_dependencies(self) -> tuple[str, ...]:
    """Add custom dev dependencies to pyrig's default list."""
    return (*super().dev_dependencies(), "types-tqdm", "types-defusedxml")