Skip to content

event_quality_warnings

EventQualityWarnings

Bases: Component

Component that calculates Quality Warnings from the Quality Metrics generated by the Event Cleaning component

Source code in multimno/components/quality/event_quality_warnings/event_quality_warnings.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
class EventQualityWarnings(Component):
    """
    Component that calculates Quality Warnings from the Quality Metrics generated by the Event Cleaning component
    """

    COMPONENT_ID = "EventQualityWarnings"

    dict_convert_to_num_days = {"week": 7, "month": 30}
    # dict to store info regarding error type
    # first element - corresponding encoding of ErrorTypes class
    # second element - naming constants for coresponding measure definitions, conditions, and warning texts
    dict_error_type_info = {
        "missing_value": [ErrorTypes.NULL_VALUE, "Missing value rate"],
        "not_right_syntactic_format": [
            ErrorTypes.CANNOT_PARSE,
            "Wrong type/format rate",
        ],
        "out_of_admissible_values": [
            ErrorTypes.OUT_OF_RANGE,
            "Out of range rate",
        ],
        "no_location": [ErrorTypes.NO_LOCATION_INFO, "No location error rate"],
        "no_domain": [ErrorTypes.NO_MNO_INFO, "No domain error rate"],
        "out_of_bounding_box": [
            ErrorTypes.OUT_OF_RANGE,
            "Out of bounding box error rate",
        ],
        "same_location_duplicate": [
            ErrorTypes.DUPLICATED,
            "Deduplication same locations rate",
        ],
    }

    def __init__(self, general_config_path: str, component_config_path: str) -> None:
        super().__init__(general_config_path, component_config_path)

        print(self.config)
        self.lookback_period = self.config.get(EventQualityWarnings.COMPONENT_ID, "lookback_period")
        self.lookback_period_in_days = self.dict_convert_to_num_days[self.lookback_period]

        self.data_period_start = self.config.get(EventQualityWarnings.COMPONENT_ID, "data_period_start")
        self.data_period_end = self.config.get(EventQualityWarnings.COMPONENT_ID, "data_period_end")

        self.qw_dfs_log = []
        self.qw_dfs_plots = []

        # FOR SYNTACTIC QUALITY WARNINGS
        self.do_size_raw_data_qw = self.config.getboolean(
            EventQualityWarnings.COMPONENT_ID, "do_size_raw_data_qw", fallback=False
        )
        self.do_size_clean_data_qw = self.config.getboolean(
            EventQualityWarnings.COMPONENT_ID, "do_size_clean_data_qw", fallback=False
        )

        self.data_size_tresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID, "data_size_tresholds", fallback=None
        )

        self.do_error_rate_by_date_qw = self.config.getboolean(
            EventQualityWarnings.COMPONENT_ID,
            "do_error_rate_by_date_qw",
            fallback=False,
        )

        self.do_error_rate_by_date_and_cell_qw = self.config.getboolean(
            EventQualityWarnings.COMPONENT_ID,
            "do_error_rate_by_date_and_cell_qw",
            fallback=False,
        )

        self.do_error_rate_by_date_and_user_qw = self.config.getboolean(
            EventQualityWarnings.COMPONENT_ID,
            "do_error_rate_by_date_and_user_qw",
            fallback=False,
        )

        self.do_error_rate_by_date_and_cell_user_qw = self.config.getboolean(
            EventQualityWarnings.COMPONENT_ID,
            "do_error_rate_by_date_and_cell_user_qw",
            fallback=False,
        )

        self.error_rate_tresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID, "error_rate_tresholds", fallback=None
        )

        self.error_type_qw_checks = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID, "error_type_qw_checks", fallback=None
        )

        self.missing_value_thresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID, "missing_value_thresholds", fallback=None
        )

        self.out_of_admissible_values_thresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID,
            "out_of_admissible_values_thresholds",
            fallback=None,
        )

        self.not_right_syntactic_format_thresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID,
            "not_right_syntactic_format_thresholds",
            fallback=None,
        )

        self.no_location_thresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID, "no_location_thresholds", fallback=None
        )

        self.no_domain_thresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID, "no_domain_thresholds", fallback=None
        )

        self.out_of_bounding_box_thresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID,
            "out_of_bounding_box_thresholds",
            fallback=None,
        )
        # FOR DEDUPLICATION QUALITY WARNINGS
        self.deduplication_same_location_thresholds = self.config.geteval(
            EventQualityWarnings.COMPONENT_ID,
            "deduplication_same_location_thresholds",
            fallback=None,
        )

    def initalize_data_objects(self):
        self.input_qm_data_objects = {}
        self.output_qw_data_objects = {}
        self.clear_destination_directory = self.config.getboolean(
            EventQualityWarnings.COMPONENT_ID, "clear_destination_directory"
        )

        self.input_qm_by_column_path_key = self.config.get(
            EventQualityWarnings.COMPONENT_ID, "input_qm_by_column_path_key"
        )
        self.input_qm_freq_distr_path_key = self.config.get(
            EventQualityWarnings.COMPONENT_ID, "input_qm_freq_distr_path_key"
        )
        self.output_qw_log_table_path_key = self.config.get(
            EventQualityWarnings.COMPONENT_ID, "output_qw_log_table_path_key"
        )
        self.output_qw_for_plots_path_key = self.config.get(
            EventQualityWarnings.COMPONENT_ID,
            "output_qw_for_plots_path_key",
            fallback=None,
        )

        self.input_qm_by_column_path = self.config.get(CONFIG_SILVER_PATHS_KEY, self.input_qm_by_column_path_key)
        if check_if_data_path_exists(self.spark, self.input_qm_by_column_path):
            self.input_qm_data_objects[SilverEventDataSyntacticQualityMetricsByColumn.ID] = (
                SilverEventDataSyntacticQualityMetricsByColumn(self.spark, self.input_qm_by_column_path)
            )
        else:
            self.logger.warning("Wrong path for Quality Metrics By Column, terminating component execution")
            raise ValueError("Invalid path for Quality Metrics By Column")

        self.input_qm_freq_distr_path = self.config.get(CONFIG_SILVER_PATHS_KEY, self.input_qm_freq_distr_path_key)
        if check_if_data_path_exists(self.spark, self.input_qm_freq_distr_path):
            self.input_qm_data_objects[SilverEventDataSyntacticQualityMetricsFrequencyDistribution.ID] = (
                SilverEventDataSyntacticQualityMetricsFrequencyDistribution(self.spark, self.input_qm_freq_distr_path)
            )
        else:
            self.logger.warning(
                "Wrong path for Quality Metrics Frequency Distribution, terminating component execution"
            )
            raise ValueError("Invalid path for Quality Metrics Frequency Distribution")

        self.output_qw_log_table_path = self.config.get(CONFIG_SILVER_PATHS_KEY, self.output_qw_log_table_path_key)
        if self.clear_destination_directory:
            delete_file_or_folder(self.spark, self.output_qw_log_table_path)
        check_or_create_data_path(self.spark, self.output_qw_log_table_path)
        self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsLogTable.ID] = (
            SilverEventDataSyntacticQualityWarningsLogTable(self.spark, self.output_qw_log_table_path)
        )
        # no plots information is intended for EventDeduplicationQualityWarnings
        if self.output_qw_for_plots_path_key is not None:
            self.output_qw_for_plots_path = self.config.get(CONFIG_SILVER_PATHS_KEY, self.output_qw_for_plots_path_key)
            if self.clear_destination_directory:
                delete_file_or_folder(self.spark, self.output_qw_for_plots_path)
            check_or_create_data_path(self.spark, self.output_qw_for_plots_path)
            self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsForPlots.ID] = (
                SilverEventDataSyntacticQualityWarningsForPlots(self.spark, self.output_qw_for_plots_path)
            )

    def read(self):
        self.input_qm_data_objects[SilverEventDataSyntacticQualityMetricsByColumn.ID].read()
        self.input_qm_data_objects[SilverEventDataSyntacticQualityMetricsFrequencyDistribution.ID].read()

    def write(self):
        self.save_quality_warnings_log_table(self.qw_dfs_log)
        if self.output_qw_for_plots_path_key is not None:
            self.save_quality_warnings_for_plots(self.qw_dfs_plots)

    def execute(self):
        self.logger.info(f"Starting {EventQualityWarnings.COMPONENT_ID}...")
        self.read()
        self.transform()  # Transforms the input_df
        self.write()
        self.logger.info(f"Finished {EventQualityWarnings.COMPONENT_ID}")

    def transform(self):
        self.logger.info(f"Transform method {EventQualityWarnings.COMPONENT_ID}")
        # Read QA Metrics of EventCleaning Component, the period of intrest is
        #  [data_period_start-lookback_period_in_days, data_period_end]
        # Since QualityWarnings are calculated based on prior data
        # TODO: deal with cases when df_qa_by_column, df_qa_freq_distribution do not have data for
        # whole defined period
        # TODO: dynamically define/check the possible research period of QW  based on data period
        #  of df_qa_by_column and df_qa_freq_distribution
        # TODO: implement min_period conf param which is minimal amount of days with previous data to
        #  have in order to calculate QW (the case for first days in reaserch period)
        sdate = pd.to_datetime(self.data_period_start) - pd.Timedelta(days=self.lookback_period_in_days)
        edate = pd.to_datetime(self.data_period_end)

        df_qa_by_column = self.input_qm_data_objects[SilverEventDataSyntacticQualityMetricsByColumn.ID].df.where(
            psf.col(ColNames.date).between(sdate, edate)
        )

        df_qa_freq_distribution = self.input_qm_data_objects[
            SilverEventDataSyntacticQualityMetricsFrequencyDistribution.ID
        ].df.where(psf.col(ColNames.date).between(sdate, edate))

        df_qa_by_column = df_qa_by_column.cache()
        # TODO: maybe makes sense to first sum init and final freq, cache and parse this aggregation
        # to further QW functions
        df_qa_freq_distribution = df_qa_freq_distribution.cache()

        if self.do_size_raw_data_qw:
            # for raw data size QW compute warnings and also retrive data to plot distribution of initial frequency
            df_raw_data_qw, df_raw_plots = self.data_size_qw(
                df_qa_freq_distribution,
                self.lookback_period_in_days,
                self.data_size_tresholds["SIZE_RAW_DATA_BYDATE_VARIABILITY"],
                self.data_size_tresholds["SIZE_RAW_DATA_BYDATE_ABS_VALUE_LOWER_LIMIT"],
                self.data_size_tresholds["SIZE_RAW_DATA_BYDATE_ABS_VALUE_UPPER_LIMIT"],
                type_of_data="raw",
            )
            self.qw_dfs_log.append(df_raw_data_qw)
            self.qw_dfs_plots.append(df_raw_plots)

        if self.do_size_clean_data_qw:
            # for clean data size QW compute warnings and also retrive data to plot distribution of total frequency
            df_clean_data_qw, df_clean_plots = self.data_size_qw(
                df_qa_freq_distribution,
                self.lookback_period_in_days,
                self.data_size_tresholds["SIZE_CLEAN_DATA_BYDATE_VARIABILITY"],
                self.data_size_tresholds["SIZE_CLEAN_DATA_BYDATE_ABS_VALUE_LOWER_LIMIT"],
                self.data_size_tresholds["SIZE_CLEAN_DATA_BYDATE_ABS_VALUE_UPPER_LIMIT"],
                type_of_data="clean",
            )
            self.qw_dfs_log.append(df_clean_data_qw)
            self.qw_dfs_plots.append(df_clean_plots)

        if self.do_error_rate_by_date_qw:
            # for error rate by date QW compute warnings and also retrive data to
            # plot distribution of error rate by date
            df_error_rate_by_date_qw, df_error_rate_plots = self.error_rate_qw(
                df_qa_freq_distribution,
                self.lookback_period_in_days,
                [ColNames.date],
                self.error_rate_tresholds["TOTAL_ERROR_RATE_BYDATE_OVER_AVERAGE"],
                self.error_rate_tresholds["TOTAL_ERROR_RATE_BYDATE_VARIABILITY"],
                self.error_rate_tresholds["TOTAL_ERROR_RATE_BYDATE_ABS_VALUE_UPPER_LIMIT"],
                save_data_for_plots=True,
            )

            self.qw_dfs_log.append(df_error_rate_by_date_qw)
            self.qw_dfs_plots.append(df_error_rate_plots)
        # The current aggrement is that for next error rates (more granular ones) do not store any data for plots
        # although it could be done with save_data_for_plots=True
        # TODO: should we consider error rate of null user_id or/and null cell_id in QW computation
        if self.do_error_rate_by_date_and_cell_qw:
            df_error_rate_by_date_and_cell_qw, _ = self.error_rate_qw(
                df_qa_freq_distribution,
                self.lookback_period_in_days,
                [ColNames.date, ColNames.cell_id],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYCELL_OVER_AVERAGE"],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYCELL_VARIABILITY"],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYCELL_ABS_VALUE_UPPER_LIMIT"],
            )

            self.qw_dfs_log.append(df_error_rate_by_date_and_cell_qw)

        if self.do_error_rate_by_date_and_user_qw:
            df_error_rate_by_date_and_user_qw, _ = self.error_rate_qw(
                df_qa_freq_distribution,
                self.lookback_period_in_days,
                [ColNames.date, ColNames.user_id],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYUSER_OVER_AVERAGE"],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYUSER_VARIABILITY"],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYUSER_ABS_VALUE_UPPER_LIMIT"],
            )

            self.qw_dfs_log.append(df_error_rate_by_date_and_user_qw)

        if self.do_error_rate_by_date_and_cell_user_qw:
            df_error_rate_by_date_and_cell_user_qw, _ = self.error_rate_qw(
                df_qa_freq_distribution,
                self.lookback_period_in_days,
                [ColNames.date, ColNames.cell_id, ColNames.user_id],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYCELL_USER_OVER_AVERAGE"],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYCELL_USER_VARIABILITY"],
                self.error_rate_tresholds["ERROR_RATE_BYDATE_BYCELL_USER_ABS_VALUE_UPPER_LIMIT"],
            )

            self.qw_dfs_log.append(df_error_rate_by_date_and_cell_user_qw)

        # Two previous types of QW were using df_qa_freq_distribution only
        # Now calculate error rate for different error types like missing_value, wrong type
        # based on two QA metrics - df_qa_by_column and df_qa_freq_distribution
        # error_type_qw_checks - dict('error_type':[relevant columns])
        for error_type, field_names in self.error_type_qw_checks.items():
            if field_names == []:
                self.logger.info(f"No field name(s) were specified for error type: {error_type}")
            else:
                # if you have a new error_type and thus new error_type_thresholds entry in config
                # make sure to add it to class atributes and to this block with elif statement
                if error_type == "missing_value":
                    error_type_thresholds = self.missing_value_thresholds
                elif error_type == "out_of_admissible_values":
                    error_type_thresholds = self.out_of_admissible_values_thresholds
                elif error_type == "not_right_syntactic_format":
                    error_type_thresholds = self.not_right_syntactic_format_thresholds
                elif error_type == "no_location":
                    error_type_thresholds = self.no_location_thresholds
                elif error_type == "no_domain":
                    error_type_thresholds = self.no_domain_thresholds
                elif error_type == "out_of_bounding_box":
                    error_type_thresholds = self.out_of_bounding_box_thresholds
                elif error_type == "same_location_duplicate":
                    error_type_thresholds = self.deduplication_same_location_thresholds
                else:
                    self.logger.warning(
                        f"Unexpected error type in error_type_qw_checks config param"
                        f": {error_type}, skipping calculation for this qw"
                    )
                    continue

            for field_name in field_names:
                if field_name in error_type_thresholds.keys():
                    error_type_qw, _ = self.error_type_rate_qw(
                        df_qa_by_column,
                        df_qa_freq_distribution,
                        field_name,
                        error_type,
                        self.lookback_period_in_days,
                        *list(error_type_thresholds[field_name].values()),
                    )
                    self.qw_dfs_log.append(error_type_qw)
                else:
                    self.logger.warning(
                        f"No thresholds were specified for field {field_name} of {error_type} error_type"
                    )

        self.spark.catalog.clearCache()

    def data_size_qw(
        self,
        df_freq_distribution: DataFrame,
        lookback_period_in_days: int,
        variablility: Union[int, float],
        lower_limit: Union[int, float],
        upper_limit: Union[int, float],
        type_of_data: str,
        measure_definition_canva: str = f"{MeasureDefinitions.size_data}",
        cond_warn_variability_canva: str = f"{Conditions.size_data_variability}-{Warnings.size_data_variability}",
        cond_warn_upper_lower_canva: str = f"{Conditions.size_data_upper_lower}-{Warnings.size_data_upper_lower}",
    ) -> Tuple[DataFrame]:
        """
        A unified function to check both raw and clean data sizes, calculates four types of QWs:
        LOWER_VARIABILITY - for each row using calculated mean and std compute lower variability limit
            which is mean - SD*variability, check if  daily_value is lower tan limit
        UPPER_VARIABILITY - for each row using calculated mean and std compute upper variability limit
             which is mean + SD*variability, check if  daily_value exceeds limit
        ABS_LOWER_LIMIT - check if daily_value is lower than absolute number lower_limit
        ABS_UPPER_LIMIT - check if daily_value exceeds absolute number upper_limit
        All four QWs depend on thresholds, in case if statement condition is met -> store cond-warn-condition_value
            information in array column,
        some rows may have several QWs. In the end array column is exploded and cond-warn-consition_value information
            is split into three corresponding columns.
        The function returns almost ready dfs for SilverEventDataSyntacticQualityWarningsLogTable
             and SilverEventDataSyntacticQualityWarningsForPlots DOs

        Args:
            df_freq_distribution (DataFrame): df with frequency data
            lookback_period_in_days (int): lenght of lookback period in days
            variablility (Union[int, float]): config param, the number of SD to define the upper and lower varibaility
                limits: mean_size ± SD*variability, which daily_value should not exceed/be lower
            lower_limit (Union[int, float]): absolute number which daily_value should not be lower
            upper_limit (Union[int, float]): absolute number which daily_value can not exceed
            type_of_data (str): which type of data raw or clean to check for QWs
            measure_definition_canva (str): canva text to use for measure_definition column (see measure_definition.py)
            cond_warn_variability_canva (str): canva text to use for lower_upper_variability cases
                of data_size QWs (see conditions.py and warnings.py)
            cond_warn_upper_lower_canva (str): canva text to use for lower_upper_limit cases
                of data_size QWs (see conditions.py and warnings.py)

        Returns:
            tuple(DataFrame, DataFrame): a tuple, where first df
                is used for warning log table, and the second df - for plots
        """
        # based on type_of_data calculate either total daily initial freqeuncy or total daily final frequency
        if type_of_data == "raw":
            sum_column = ColNames.initial_frequency
        else:
            sum_column = ColNames.final_frequency
        # fill in string canvases
        measure_definition = measure_definition_canva.format(type_of_data=type_of_data)
        cond_warn_variability = cond_warn_variability_canva.format(SD=variablility, type_of_data=type_of_data)
        cond_warn_upper_lower = cond_warn_upper_lower_canva.format(
            X=lower_limit, Y=upper_limit, type_of_data=type_of_data
        )
        # define lookback period
        window = Window.orderBy(ColNames.date).rowsBetween(-lookback_period_in_days, -1)
        # prepare data
        # calculate mean and std over window, and based on them UCL - for UPPER_VARIABILITY check and LCL
        #   - for LOWER_VARIABILITY check
        # create empty array cond_warn_condition_value column to store information about qws
        df_prep = (
            df_freq_distribution.groupBy(ColNames.date)
            .agg(psf.sum(sum_column).alias(ColNames.daily_value))
            .withColumns(
                {
                    ColNames.average: psf.avg(ColNames.daily_value).over(window),
                    "std_freq": psf.stddev(ColNames.daily_value).over(window),
                    ColNames.UCL: psf.col(ColNames.average) + variablility * psf.col("std_freq"),
                    ColNames.LCL: psf.col(ColNames.average) - variablility * psf.col("std_freq"),
                    "cond_warn_condition_value": psf.array(),
                }
            )
        )

        df_prep = df_prep.cache()
        # continue with QWs checks
        # first filter data by period from [data_period_start-lookback_period_in_days, data_period_end]
        # to [data_period_start, data_period_end]
        # - a specified research period of QW
        df_qw = df_prep.filter(psf.col(ColNames.date) >= self.data_period_start)
        # perform LOWER_VARIABILITY, UPPER_VARIABILITY, ABS_LOWER_LIMIT and ABS_UPPER_LIMIT checks
        # if condition is met append information about condition-warning_text-condition_value as a string
        # into array column 'cond_warn_condition_value'
        df_qw = (
            df_qw.withColumn(
                "cond_warn_condition_value",
                psf.when(
                    (psf.col(ColNames.daily_value) < psf.col(ColNames.LCL)),
                    psf.array_append(
                        psf.col("cond_warn_condition_value"),
                        psf.concat(
                            psf.lit(f"{cond_warn_variability}-"),
                            psf.col(ColNames.LCL).cast("string"),
                        ),
                    ),
                ).otherwise(psf.col("cond_warn_condition_value")),
            )
            .withColumn(
                "cond_warn_condition_value",
                psf.when(
                    (psf.col(ColNames.daily_value) > psf.col(ColNames.UCL)),
                    psf.array_append(
                        psf.col("cond_warn_condition_value"),
                        psf.concat(
                            psf.lit(f"{cond_warn_variability}-"),
                            psf.col(ColNames.UCL).cast("string"),
                        ),
                    ),
                ).otherwise(psf.col("cond_warn_condition_value")),
            )
            .withColumn(
                "cond_warn_condition_value",
                psf.when(
                    (psf.col(ColNames.daily_value) < psf.lit(lower_limit)),
                    psf.array_append(
                        psf.col("cond_warn_condition_value"),
                        psf.lit(f"{cond_warn_upper_lower}-{str(lower_limit)}"),
                    ),
                ).otherwise(psf.col("cond_warn_condition_value")),
            )
            .withColumn(
                "cond_warn_condition_value",
                psf.when(
                    (psf.col(ColNames.daily_value) > psf.lit(upper_limit)),
                    psf.array_append(
                        psf.col("cond_warn_condition_value"),
                        psf.lit(f"{cond_warn_upper_lower}-{str(upper_limit)}"),
                    ),
                ).otherwise(psf.col("cond_warn_condition_value")),
            )
        )
        # explode array column 'cond_warn_condition_value'
        df_qw = df_qw.withColumn(
            "cond_warn_condition_value",
            psf.explode(psf.col("cond_warn_condition_value")),
        )
        # add some column constants
        # split "cond_warn_condition_value" column into three: condition, wanring_text, conditon_value to
        # match SilverEventDataSyntacticQualityWarningsLogTable.SCHEMA
        # select only needed columns
        df_qw = df_qw.withColumns(
            {
                ColNames.lookback_period: psf.lit(self.lookback_period),
                ColNames.measure_definition: psf.lit(measure_definition),
                ColNames.condition: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(0),
                ColNames.warning_text: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(1),
                ColNames.condition_value: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(2).cast("float"),
            }
        ).select(self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsLogTable.ID].SCHEMA.fieldNames())

        # save data for plots
        # no filter by date because we need previous data of first days for plots
        df_plots = df_prep.withColumns(
            {
                ColNames.lookback_period: psf.lit(self.lookback_period),
                ColNames.type_of_qw: psf.lit(f"{type_of_data}_data_size"),
            }
        ).select(self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsForPlots.ID].SCHEMA.fieldNames())

        return df_qw, df_plots

    def error_rate_qw(
        self,
        df_freq_distribution: DataFrame,
        lookback_period_in_days: int,
        variables: List[str],
        error_rate_over_average: Union[int, float],
        error_rate_upper_variability: Union[int, float],
        error_rate_upper_limit: Union[int, float],
        error_rate_measure_definition_canva: str = f"{MeasureDefinitions.error_rate}",
        error_rate_cond_warn_over_average_canva: str = f"{Conditions.error_rate_over_average}-{Warnings.error_rate_over_average}",
        error_rate_cond_warn_upper_variability_canva: str = f"{Conditions.error_rate_upper_variability}-{Warnings.error_rate_upper_variability}",
        error_rate_cond_warn_upper_limit_canva: str = f"{Conditions.error_rate_upper_limit}-{Warnings.error_rate_upper_limit}",
        save_data_for_plots: bool = False,
    ) -> Tuple[Union[DataFrame, None]]:
        """
        Prepare data for error rate calculation. First fill in different string canvas,
            then define window of aggregation, and calculate error_rate over the window on follwoing formula:
            (Total initial frequency - Total final frequency) / Total initial frequency*100.
            Parse preprocessed input to self.rate_common_qw function which calculates three types
                of QWs: OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT

        Args:
            df_freq_distribution (DataFrame): df with frequency data.
            lookback_period_in_days (int): number of days prior to date of interest.
            variables (List[str]): list of column names by which error rate is calculated, kind of granularity level
            error_rate_over_average (Union[int, float]): config param, specifies the upper limit which a daily value
                can not exceed its corresponding mean error rate
            error_rate_upper_variability (Union[int, float]): config param, the number of SD to define the upper
                varibaility limit: mean_rate + SD*error_rate_upper_variability, which error rate can't exceed
            error_rate_upper_limit (Union[int, float]): absolute number which error rate can not exceed
            error_rate_measure_definition_canva (str): canva text to use for measure_definition
                column (see measure_definition.py)
            error_rate_cond_warn_over_average_canva (str): canva text to use for over_average cases of
                error_rate QWs (see conditions.py and warnings.py)
            error_rate_cond_warn_upper_variability_canva (str): canva text to use for upper_variability cases of
                error_rate QWs (see conditions.py and warnings.py)
            error_rate_cond_warn_upper_limit_canva (str): canva text to use for upper_limit cases of error_rate
                QWs (see conditions.py and warnings.py)
            save_data_for_plots (bool): boolean, decide whether to store error rate and its corresponding average
                and upper variability limit for plots, default False
        Returns:
            tuple(Union[DataFrame, None]): a tuple, where first df is used for warning log table,
                and the second df - for plots (could be also None)
        """
        # fill in all string comnstants with relevant information
        # based on variables, error_rate_over_average, error_rate_upper_variability, error_rate_upper_limit args
        error_rate_measure_definition = error_rate_measure_definition_canva.format(variables="&".join(variables))

        error_rate_cond_warn_over_average = error_rate_cond_warn_over_average_canva.format(
            variables="&".join(variables), X=error_rate_over_average
        )
        error_rate_cond_warn_upper_variability = error_rate_cond_warn_upper_variability_canva.format(
            variables="&".join(variables), SD=error_rate_upper_variability
        )
        error_rate_cond_warn_upper_limit = error_rate_cond_warn_upper_limit_canva.format(
            variables="&".join(variables), X=error_rate_upper_limit
        )
        # qws of error rate by date is calculated based on previous days
        if variables == [ColNames.date]:
            window = Window.orderBy(ColNames.date).rowsBetween(-lookback_period_in_days, -1)
        else:
            # qws of error rate by date and other colum(s) is calculated over all "data points" of the same date
            window = Window.partitionBy(ColNames.date)
        # calculate error rate, a.k.a daily_value
        df_qw = (
            df_freq_distribution.groupBy(*variables)
            .agg(
                psf.sum(ColNames.initial_frequency).alias("sum_init_freq"),
                psf.sum(ColNames.final_frequency).alias("sum_final_freq"),
            )
            .withColumn(
                ColNames.daily_value,
                (psf.col("sum_init_freq") - psf.col("sum_final_freq")) / psf.col("sum_init_freq") * 100,
            )
        )
        # using self.rate_common_qw funciton calculate three types of QWs
        #   -> OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT
        # return a tuple of (df_log, df_plots | None)
        qw_result = self.rate_common_qw(
            df_qw,
            window,
            error_rate_upper_variability,
            error_rate_over_average,
            error_rate_upper_limit,
            error_rate_measure_definition,
            error_rate_cond_warn_upper_variability,
            error_rate_cond_warn_over_average,
            error_rate_cond_warn_upper_limit,
            save_data_for_plots,
        )

        return qw_result

    def error_type_rate_qw(
        self,
        df_qa_by_column: DataFrame,
        df_freq_distribution: DataFrame,
        field_name: Union[str, None],
        error_type: str,
        lookback_period_in_days: int,
        error_type_rate_over_average: Union[int, float],
        error_type_rate_upper_variability: Union[int, float],
        error_type_rate_upper_limit: Union[int, float],
        error_type_rate_measure_definition_canva: str = f"{MeasureDefinitions.error_type_rate}",
        error_type_rate_cond_warn_over_average_canva: str = f"{Conditions.error_type_rate_over_average}-{Warnings.error_type_rate_over_average}",
        error_type_rate_cond_warn_upper_variability_canva: str = f"{Conditions.error_type_rate_upper_variability}-{Warnings.error_type_rate_upper_variability}",
        error_type_rate_cond_warn_upper_limit_canva: str = f"{Conditions.error_type_rate_upper_limit}-{Warnings.error_type_rate_upper_limit}",
    ) -> Tuple[Union[DataFrame, None]]:
        """
        Prepare data for error type rate calculation. First fill in different string canvas, then based
            on field name and error type calculate their corresponding error rate using formula:
            number of errors of this error_type&field_name combo / Total initial frequency *100 (BY DATE).
            Parse preprocessed input along with window (which is a lookback period)
            to self.rate_common_qw function which calculates three types of QWs:
            OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT


        Args:
            df_qa_by_column (DataFrame): df with qa by column data.
            df_freq_distribution (DataFrame): df with frequency data.
            field_name (str | None): config param, the name of column of which to check error_type.
            error_type (str): config param, the name of error type.
            lookback_period_in_days (int): number of days prior to date of intrest.
            error_type_rate_over_average (Union[int, float]): config param, specifies the upper limit over which daily
                value can not exceed its corresponding mean error rate.
            error_type_rate_upper_variability (Union[int, float]): config param, the number of SD to define the upper
                varibaility limit: mean_rate + SD*error_type_rate_upper_variability, which daily value can not exceed
            error_type_rate_upper_limit (Union[int, float]): absolute number which daily value can not exceed
            error_type_rate_measure_definition_canva (str): canva text to use for measure_definition
                column (see measure_definition.py)
            error_type_rate_cond_warn_over_average_canva (str): canva text to use for over_average cases of
                error_type_rate QWs (see conditions.py and warnings.py)
            error_type_rate_cond_warn_upper_variability_canva (str): canva text to use for upper_variability
                cases of error_type_rate QWs (see conditions.py and warnings.py)
            error_type_rate_cond_warn_upper_limit_canva (str): canva text to use for upper_limit cases of
                error_type_rate QWs (see conditions.py and warnings.py)
        Returns:
            tuple(Union[DataFrame, None]): a tuple, where first df is used for warning log table,
                and the second df - for plots, but since save_data_for_plots always False, output=None
        """
        # based on error_type, error_type_rate_over_average, error_type_rate_upper_variability,
        #  error_type_rate_upper_limit
        # fill in string canvases
        colname_error_type, error_type_qw_name = self.dict_error_type_info[error_type]

        error_type_rate_measure_definition = error_type_rate_measure_definition_canva.format(
            error_type_name=error_type_qw_name, field_name=str(field_name)
        )

        error_type_rate_cond_warn_over_average = error_type_rate_cond_warn_over_average_canva.format(
            error_type_name=error_type_qw_name,
            field_name=str(field_name),
            X=error_type_rate_over_average,
        )
        error_type_rate_cond_warn_upper_variability = error_type_rate_cond_warn_upper_variability_canva.format(
            error_type_name=error_type_qw_name,
            field_name=str(field_name),
            SD=error_type_rate_upper_variability,
        )
        error_type_rate_cond_warn_upper_limit = error_type_rate_cond_warn_upper_limit_canva.format(
            error_type_name=error_type_qw_name,
            field_name=str(field_name),
            X=error_type_rate_upper_limit,
        )
        # for error_type that have more then one or applicable columns
        # filter df_qa_by_column by field_name and error_type
        if field_name is not None:
            df_qa_by_column = df_qa_by_column.filter(
                (psf.col(ColNames.variable) == field_name) & (psf.col(ColNames.type_of_error) == colname_error_type)
            ).select(ColNames.date, ColNames.value)
        else:
            # for error_types which technically do not belong specifically to one of event
            # columns filter only by error_type (e.g. no_location error_type)
            df_qa_by_column = df_qa_by_column.filter(psf.col(ColNames.type_of_error) == colname_error_type).select(
                ColNames.date, ColNames.value
            )
        # calculate total daily initial frequency
        df_freq_distribution = (
            df_freq_distribution.groupby(ColNames.date)
            .agg(psf.sum(ColNames.initial_frequency).alias("sum_init_freq"))
            .select(ColNames.date, "sum_init_freq")
        )
        # for each date combine two type of information number of errors and total daily initial frequency
        df_combined = df_qa_by_column.join(df_freq_distribution, on=ColNames.date, how="inner")
        # for each date calculate error_type_rate, a.k.a daily_value
        df_temp = df_combined.withColumn(
            ColNames.daily_value,
            (psf.col(ColNames.value) / psf.col("sum_init_freq")) * 100,
        )

        # qws will be caluclated based on previous days
        window = Window.orderBy(ColNames.date).rowsBetween(-lookback_period_in_days, -1)
        # using self.rate_common_qw funciton calculate three types of QWs ->
        # OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT
        # return a tuple of (df_log, None)
        qw_result = self.rate_common_qw(
            df_temp,
            window,
            error_type_rate_upper_variability,
            error_type_rate_over_average,
            error_type_rate_upper_limit,
            error_type_rate_measure_definition,
            error_type_rate_cond_warn_upper_variability,
            error_type_rate_cond_warn_over_average,
            error_type_rate_cond_warn_upper_limit,
        )
        return qw_result

    def rate_common_qw(
        self,
        df_temp: DataFrame,
        window: Window,
        rate_upper_variability: Union[int, float],
        rate_over_average: Union[int, float],
        rate_upper_limit: Union[int, float],
        measure_definition: str,
        cond_warn_upper_variability: str,
        cond_warn_over_average: str,
        cond_warn_upper_limit: str,
        save_data_for_plots: bool = False,
    ) -> Tuple[Union[DataFrame, None]]:
        """
        Take input df with "daily_value" column, and calculates three types of QWs:
        OVER_AVERAGE - for each row first based on specified window take mean of values, and then check if
            daily_value exceeds mean by more than rate_over_average
        UPPER_VARIABILITY - for each row using already calculated mean compute upper variability limit which is
            mean + SD*rate_upper_variability, check if  daily_value exceeds it
        ABS_UPPER_LIMIT - check if daily_value exceeds absolute number rate_upper_limit
        All three QWs depend on specified thresholds, if daily_value exceeds one of calculated values it will
            store cond-warn-condition_value information in array column,
        some rows may have several QWs. In the end array column is exploded and cond-warn-condition_value
            information is split into three corresponding columns.
        The function returns almost ready df for SilverEventDataSyntacticQualityWarningsLogTable DO, and based
            on save_data_for_plots arg returns either almost ready data for plots or None

        Args:
            df_temp (DataFrame): temprory data that must have daily_value column to
                be used in further QW calculations
            window (Window): a window within which perform aggregation
            rate_upper_variability (int|float): config param, specifies the upper limit over which daily value
                 can not exceed its corresponding mean error rate
            rate_over_average (int|float): config param, the number of SD to define the upper varibaility
                 limit: mean_rate + SD*error_rate_upper_variability, which daily value can not exceed
            rate_upper_limit (int|float): absolute number which daily value can not exceed
            measure_definition (str): canva text to use for measure_definition column (see measure_definition.py)
            cond_warn_over_average (str): canva text to use for over_average cases (see conditions.py and warnings.py)
            cond_warn_upper_variability (str): canva text to use for
                upper_variability cases (see conditions.py and warnings.py)
            cond_warn_upper_limit (str): canva text to use for upper_limit cases (see conditions.py and warnings.py)
            save_data_for_plots (bool): boolean, decide whether to store daily_value and its corresponding average
                and upper variability limit for plots. Defaults to False.
        Returns:
             tuple(Union[DataFrame, None]): a tuple, where first df is used for
                warning log table, and the second df - for plots
        """
        # prepare data
        # calculate mean and std over window, and based on them UCL - for UPPER_VARIABILITY check
        # ratio_perc - for OVER_AVERAGE check
        # create empty array cond_warn_condition_value column to store inromation about qws
        df_prep = df_temp.withColumns(
            {
                ColNames.average: psf.avg(ColNames.daily_value).over(window),
                "std_rate": psf.stddev(ColNames.daily_value).over(window),
                "ratio_perc": (psf.col(ColNames.daily_value) / psf.col(ColNames.average)) * 100,
                ColNames.UCL: psf.col(ColNames.average) + rate_upper_variability * psf.col("std_rate"),
                "cond_warn_condition_value": psf.array(),
            }
        )
        # if save_data_for_plots=True, add some new columns with constant values
        # and select only applicable to SilverEventDataSyntacticQualityWarningsForPlots.SCHEMA
        # else - return None
        if save_data_for_plots:
            df_prep = df_prep.cache()
            df_plots = df_prep.withColumns(
                {
                    ColNames.lookback_period: psf.lit(self.lookback_period),
                    ColNames.type_of_qw: psf.lit("error_rate"),
                    ColNames.LCL: psf.lit(None).cast("float"),
                }
            ).select(
                self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsForPlots.ID].SCHEMA.fieldNames()
            )
        else:
            df_plots = None

        # continue with QWs checks
        # first filter temp data by period from [data_period_start-lookback_period_in_days, data_period_end]
        #  to [data_period_start, data_period_end]
        # filter is aaplied after plot block because the first days of research period needs previous data to plot
        df_qw = df_prep.filter(psf.col(ColNames.date) >= self.data_period_start)
        # perform UPPER_VARIABILITY, OVER_AVERAGE, and ABS_UPPER_LIMIT checks
        # if condition is met store information about condition-warning_text-condition_value as a string into
        # array column 'cond_warn_condition_value'
        df_qw = (
            df_qw.withColumn(
                "cond_warn_condition_value",
                psf.when(
                    psf.col(ColNames.daily_value) > psf.col(ColNames.UCL),
                    psf.array_append(
                        psf.col("cond_warn_condition_value"),
                        psf.concat(
                            psf.lit(f"{cond_warn_upper_variability}-"),
                            psf.col(ColNames.UCL).cast("string"),
                        ),
                    ),
                ).otherwise(psf.col("cond_warn_condition_value")),
            )
            .withColumn(
                "cond_warn_condition_value",
                psf.when(
                    psf.col("ratio_perc") > psf.lit(100 + rate_over_average),
                    psf.array_append(
                        psf.col("cond_warn_condition_value"),
                        psf.lit(f"{cond_warn_over_average}-{str(rate_over_average)}"),
                    ),
                ).otherwise(psf.col("cond_warn_condition_value")),
            )
            .withColumn(
                "cond_warn_condition_value",
                psf.when(
                    psf.col(ColNames.daily_value) > psf.lit(rate_upper_limit),
                    psf.array_append(
                        psf.col("cond_warn_condition_value"),
                        psf.lit(f"{cond_warn_upper_limit}-{str(rate_upper_limit)}"),
                    ),
                ).otherwise(psf.col("cond_warn_condition_value")),
            )
        )
        # explode array column 'cond_warn_condition_value'
        df_qw = df_qw.withColumn(
            "cond_warn_condition_value",
            psf.explode(psf.col("cond_warn_condition_value")),
        )
        # add some column constants
        # split "cond_warn_condition_value" column into three: condition, wanring_text, conditon_value to match
        # SilverEventDataSyntacticQualityWarningsLogTable.SCHEMA
        # select only needed columns
        df_qw = df_qw.withColumns(
            {
                ColNames.lookback_period: psf.lit(self.lookback_period),
                ColNames.measure_definition: psf.lit(measure_definition),
                ColNames.condition: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(0),
                ColNames.warning_text: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(1),
                ColNames.condition_value: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(2).cast("float"),
            }
        ).select(self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsLogTable.ID].SCHEMA.fieldNames())

        return (df_qw, df_plots)

    def save_quality_warnings_output(
        self,
        dfs_qw: List[Union[DataFrame, None]],
        output_do: Union[
            SilverEventDataSyntacticQualityWarningsLogTable, SilverEventDataSyntacticQualityWarningsForPlots
        ],
    ):
        """
        Concatenates all elements in dfs_qw list, adjustes to schema of output_do, and using write
            method of output_do stores the result

        Args:
            dfs_qw (list): _description_
            output_do (SilverEventDataSyntacticQualityWarningsLogTable | \
                SilverEventDataSyntacticQualityWarningsForPlots): _description_
        """

        output_do.df = reduce(lambda x, y: x.union(y), dfs_qw)

        output_do.df = self.spark.createDataFrame(output_do.df.rdd, output_do.SCHEMA)

        output_do.write()

    def save_quality_warnings_log_table(self, dfs_qw):

        self.save_quality_warnings_output(
            dfs_qw,
            self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsLogTable.ID],
        )

    def save_quality_warnings_for_plots(self, dfs_qw):

        self.save_quality_warnings_output(
            dfs_qw,
            self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsForPlots.ID],
        )

data_size_qw(df_freq_distribution, lookback_period_in_days, variablility, lower_limit, upper_limit, type_of_data, measure_definition_canva=f'{MeasureDefinitions.size_data}', cond_warn_variability_canva=f'{Conditions.size_data_variability}-{Warnings.size_data_variability}', cond_warn_upper_lower_canva=f'{Conditions.size_data_upper_lower}-{Warnings.size_data_upper_lower}')

A unified function to check both raw and clean data sizes, calculates four types of QWs: LOWER_VARIABILITY - for each row using calculated mean and std compute lower variability limit which is mean - SDvariability, check if daily_value is lower tan limit UPPER_VARIABILITY - for each row using calculated mean and std compute upper variability limit which is mean + SDvariability, check if daily_value exceeds limit ABS_LOWER_LIMIT - check if daily_value is lower than absolute number lower_limit ABS_UPPER_LIMIT - check if daily_value exceeds absolute number upper_limit All four QWs depend on thresholds, in case if statement condition is met -> store cond-warn-condition_value information in array column, some rows may have several QWs. In the end array column is exploded and cond-warn-consition_value information is split into three corresponding columns. The function returns almost ready dfs for SilverEventDataSyntacticQualityWarningsLogTable and SilverEventDataSyntacticQualityWarningsForPlots DOs

Parameters:

Name Type Description Default
df_freq_distribution DataFrame

df with frequency data

required
lookback_period_in_days int

lenght of lookback period in days

required
variablility Union[int, float]

config param, the number of SD to define the upper and lower varibaility limits: mean_size ± SD*variability, which daily_value should not exceed/be lower

required
lower_limit Union[int, float]

absolute number which daily_value should not be lower

required
upper_limit Union[int, float]

absolute number which daily_value can not exceed

required
type_of_data str

which type of data raw or clean to check for QWs

required
measure_definition_canva str

canva text to use for measure_definition column (see measure_definition.py)

f'{size_data}'
cond_warn_variability_canva str

canva text to use for lower_upper_variability cases of data_size QWs (see conditions.py and warnings.py)

f'{size_data_variability}-{size_data_variability}'
cond_warn_upper_lower_canva str

canva text to use for lower_upper_limit cases of data_size QWs (see conditions.py and warnings.py)

f'{size_data_upper_lower}-{size_data_upper_lower}'

Returns:

Name Type Description
tuple (DataFrame, DataFrame)

a tuple, where first df is used for warning log table, and the second df - for plots

Source code in multimno/components/quality/event_quality_warnings/event_quality_warnings.py
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
def data_size_qw(
    self,
    df_freq_distribution: DataFrame,
    lookback_period_in_days: int,
    variablility: Union[int, float],
    lower_limit: Union[int, float],
    upper_limit: Union[int, float],
    type_of_data: str,
    measure_definition_canva: str = f"{MeasureDefinitions.size_data}",
    cond_warn_variability_canva: str = f"{Conditions.size_data_variability}-{Warnings.size_data_variability}",
    cond_warn_upper_lower_canva: str = f"{Conditions.size_data_upper_lower}-{Warnings.size_data_upper_lower}",
) -> Tuple[DataFrame]:
    """
    A unified function to check both raw and clean data sizes, calculates four types of QWs:
    LOWER_VARIABILITY - for each row using calculated mean and std compute lower variability limit
        which is mean - SD*variability, check if  daily_value is lower tan limit
    UPPER_VARIABILITY - for each row using calculated mean and std compute upper variability limit
         which is mean + SD*variability, check if  daily_value exceeds limit
    ABS_LOWER_LIMIT - check if daily_value is lower than absolute number lower_limit
    ABS_UPPER_LIMIT - check if daily_value exceeds absolute number upper_limit
    All four QWs depend on thresholds, in case if statement condition is met -> store cond-warn-condition_value
        information in array column,
    some rows may have several QWs. In the end array column is exploded and cond-warn-consition_value information
        is split into three corresponding columns.
    The function returns almost ready dfs for SilverEventDataSyntacticQualityWarningsLogTable
         and SilverEventDataSyntacticQualityWarningsForPlots DOs

    Args:
        df_freq_distribution (DataFrame): df with frequency data
        lookback_period_in_days (int): lenght of lookback period in days
        variablility (Union[int, float]): config param, the number of SD to define the upper and lower varibaility
            limits: mean_size ± SD*variability, which daily_value should not exceed/be lower
        lower_limit (Union[int, float]): absolute number which daily_value should not be lower
        upper_limit (Union[int, float]): absolute number which daily_value can not exceed
        type_of_data (str): which type of data raw or clean to check for QWs
        measure_definition_canva (str): canva text to use for measure_definition column (see measure_definition.py)
        cond_warn_variability_canva (str): canva text to use for lower_upper_variability cases
            of data_size QWs (see conditions.py and warnings.py)
        cond_warn_upper_lower_canva (str): canva text to use for lower_upper_limit cases
            of data_size QWs (see conditions.py and warnings.py)

    Returns:
        tuple(DataFrame, DataFrame): a tuple, where first df
            is used for warning log table, and the second df - for plots
    """
    # based on type_of_data calculate either total daily initial freqeuncy or total daily final frequency
    if type_of_data == "raw":
        sum_column = ColNames.initial_frequency
    else:
        sum_column = ColNames.final_frequency
    # fill in string canvases
    measure_definition = measure_definition_canva.format(type_of_data=type_of_data)
    cond_warn_variability = cond_warn_variability_canva.format(SD=variablility, type_of_data=type_of_data)
    cond_warn_upper_lower = cond_warn_upper_lower_canva.format(
        X=lower_limit, Y=upper_limit, type_of_data=type_of_data
    )
    # define lookback period
    window = Window.orderBy(ColNames.date).rowsBetween(-lookback_period_in_days, -1)
    # prepare data
    # calculate mean and std over window, and based on them UCL - for UPPER_VARIABILITY check and LCL
    #   - for LOWER_VARIABILITY check
    # create empty array cond_warn_condition_value column to store information about qws
    df_prep = (
        df_freq_distribution.groupBy(ColNames.date)
        .agg(psf.sum(sum_column).alias(ColNames.daily_value))
        .withColumns(
            {
                ColNames.average: psf.avg(ColNames.daily_value).over(window),
                "std_freq": psf.stddev(ColNames.daily_value).over(window),
                ColNames.UCL: psf.col(ColNames.average) + variablility * psf.col("std_freq"),
                ColNames.LCL: psf.col(ColNames.average) - variablility * psf.col("std_freq"),
                "cond_warn_condition_value": psf.array(),
            }
        )
    )

    df_prep = df_prep.cache()
    # continue with QWs checks
    # first filter data by period from [data_period_start-lookback_period_in_days, data_period_end]
    # to [data_period_start, data_period_end]
    # - a specified research period of QW
    df_qw = df_prep.filter(psf.col(ColNames.date) >= self.data_period_start)
    # perform LOWER_VARIABILITY, UPPER_VARIABILITY, ABS_LOWER_LIMIT and ABS_UPPER_LIMIT checks
    # if condition is met append information about condition-warning_text-condition_value as a string
    # into array column 'cond_warn_condition_value'
    df_qw = (
        df_qw.withColumn(
            "cond_warn_condition_value",
            psf.when(
                (psf.col(ColNames.daily_value) < psf.col(ColNames.LCL)),
                psf.array_append(
                    psf.col("cond_warn_condition_value"),
                    psf.concat(
                        psf.lit(f"{cond_warn_variability}-"),
                        psf.col(ColNames.LCL).cast("string"),
                    ),
                ),
            ).otherwise(psf.col("cond_warn_condition_value")),
        )
        .withColumn(
            "cond_warn_condition_value",
            psf.when(
                (psf.col(ColNames.daily_value) > psf.col(ColNames.UCL)),
                psf.array_append(
                    psf.col("cond_warn_condition_value"),
                    psf.concat(
                        psf.lit(f"{cond_warn_variability}-"),
                        psf.col(ColNames.UCL).cast("string"),
                    ),
                ),
            ).otherwise(psf.col("cond_warn_condition_value")),
        )
        .withColumn(
            "cond_warn_condition_value",
            psf.when(
                (psf.col(ColNames.daily_value) < psf.lit(lower_limit)),
                psf.array_append(
                    psf.col("cond_warn_condition_value"),
                    psf.lit(f"{cond_warn_upper_lower}-{str(lower_limit)}"),
                ),
            ).otherwise(psf.col("cond_warn_condition_value")),
        )
        .withColumn(
            "cond_warn_condition_value",
            psf.when(
                (psf.col(ColNames.daily_value) > psf.lit(upper_limit)),
                psf.array_append(
                    psf.col("cond_warn_condition_value"),
                    psf.lit(f"{cond_warn_upper_lower}-{str(upper_limit)}"),
                ),
            ).otherwise(psf.col("cond_warn_condition_value")),
        )
    )
    # explode array column 'cond_warn_condition_value'
    df_qw = df_qw.withColumn(
        "cond_warn_condition_value",
        psf.explode(psf.col("cond_warn_condition_value")),
    )
    # add some column constants
    # split "cond_warn_condition_value" column into three: condition, wanring_text, conditon_value to
    # match SilverEventDataSyntacticQualityWarningsLogTable.SCHEMA
    # select only needed columns
    df_qw = df_qw.withColumns(
        {
            ColNames.lookback_period: psf.lit(self.lookback_period),
            ColNames.measure_definition: psf.lit(measure_definition),
            ColNames.condition: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(0),
            ColNames.warning_text: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(1),
            ColNames.condition_value: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(2).cast("float"),
        }
    ).select(self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsLogTable.ID].SCHEMA.fieldNames())

    # save data for plots
    # no filter by date because we need previous data of first days for plots
    df_plots = df_prep.withColumns(
        {
            ColNames.lookback_period: psf.lit(self.lookback_period),
            ColNames.type_of_qw: psf.lit(f"{type_of_data}_data_size"),
        }
    ).select(self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsForPlots.ID].SCHEMA.fieldNames())

    return df_qw, df_plots

error_rate_qw(df_freq_distribution, lookback_period_in_days, variables, error_rate_over_average, error_rate_upper_variability, error_rate_upper_limit, error_rate_measure_definition_canva=f'{MeasureDefinitions.error_rate}', error_rate_cond_warn_over_average_canva=f'{Conditions.error_rate_over_average}-{Warnings.error_rate_over_average}', error_rate_cond_warn_upper_variability_canva=f'{Conditions.error_rate_upper_variability}-{Warnings.error_rate_upper_variability}', error_rate_cond_warn_upper_limit_canva=f'{Conditions.error_rate_upper_limit}-{Warnings.error_rate_upper_limit}', save_data_for_plots=False)

Prepare data for error rate calculation. First fill in different string canvas, then define window of aggregation, and calculate error_rate over the window on follwoing formula: (Total initial frequency - Total final frequency) / Total initial frequency*100. Parse preprocessed input to self.rate_common_qw function which calculates three types of QWs: OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT

Parameters:

Name Type Description Default
df_freq_distribution DataFrame

df with frequency data.

required
lookback_period_in_days int

number of days prior to date of interest.

required
variables List[str]

list of column names by which error rate is calculated, kind of granularity level

required
error_rate_over_average Union[int, float]

config param, specifies the upper limit which a daily value can not exceed its corresponding mean error rate

required
error_rate_upper_variability Union[int, float]

config param, the number of SD to define the upper varibaility limit: mean_rate + SD*error_rate_upper_variability, which error rate can't exceed

required
error_rate_upper_limit Union[int, float]

absolute number which error rate can not exceed

required
error_rate_measure_definition_canva str

canva text to use for measure_definition column (see measure_definition.py)

f'{error_rate}'
error_rate_cond_warn_over_average_canva str

canva text to use for over_average cases of error_rate QWs (see conditions.py and warnings.py)

f'{error_rate_over_average}-{error_rate_over_average}'
error_rate_cond_warn_upper_variability_canva str

canva text to use for upper_variability cases of error_rate QWs (see conditions.py and warnings.py)

f'{error_rate_upper_variability}-{error_rate_upper_variability}'
error_rate_cond_warn_upper_limit_canva str

canva text to use for upper_limit cases of error_rate QWs (see conditions.py and warnings.py)

f'{error_rate_upper_limit}-{error_rate_upper_limit}'
save_data_for_plots bool

boolean, decide whether to store error rate and its corresponding average and upper variability limit for plots, default False

False

Returns: tuple(Union[DataFrame, None]): a tuple, where first df is used for warning log table, and the second df - for plots (could be also None)

Source code in multimno/components/quality/event_quality_warnings/event_quality_warnings.py
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
def error_rate_qw(
    self,
    df_freq_distribution: DataFrame,
    lookback_period_in_days: int,
    variables: List[str],
    error_rate_over_average: Union[int, float],
    error_rate_upper_variability: Union[int, float],
    error_rate_upper_limit: Union[int, float],
    error_rate_measure_definition_canva: str = f"{MeasureDefinitions.error_rate}",
    error_rate_cond_warn_over_average_canva: str = f"{Conditions.error_rate_over_average}-{Warnings.error_rate_over_average}",
    error_rate_cond_warn_upper_variability_canva: str = f"{Conditions.error_rate_upper_variability}-{Warnings.error_rate_upper_variability}",
    error_rate_cond_warn_upper_limit_canva: str = f"{Conditions.error_rate_upper_limit}-{Warnings.error_rate_upper_limit}",
    save_data_for_plots: bool = False,
) -> Tuple[Union[DataFrame, None]]:
    """
    Prepare data for error rate calculation. First fill in different string canvas,
        then define window of aggregation, and calculate error_rate over the window on follwoing formula:
        (Total initial frequency - Total final frequency) / Total initial frequency*100.
        Parse preprocessed input to self.rate_common_qw function which calculates three types
            of QWs: OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT

    Args:
        df_freq_distribution (DataFrame): df with frequency data.
        lookback_period_in_days (int): number of days prior to date of interest.
        variables (List[str]): list of column names by which error rate is calculated, kind of granularity level
        error_rate_over_average (Union[int, float]): config param, specifies the upper limit which a daily value
            can not exceed its corresponding mean error rate
        error_rate_upper_variability (Union[int, float]): config param, the number of SD to define the upper
            varibaility limit: mean_rate + SD*error_rate_upper_variability, which error rate can't exceed
        error_rate_upper_limit (Union[int, float]): absolute number which error rate can not exceed
        error_rate_measure_definition_canva (str): canva text to use for measure_definition
            column (see measure_definition.py)
        error_rate_cond_warn_over_average_canva (str): canva text to use for over_average cases of
            error_rate QWs (see conditions.py and warnings.py)
        error_rate_cond_warn_upper_variability_canva (str): canva text to use for upper_variability cases of
            error_rate QWs (see conditions.py and warnings.py)
        error_rate_cond_warn_upper_limit_canva (str): canva text to use for upper_limit cases of error_rate
            QWs (see conditions.py and warnings.py)
        save_data_for_plots (bool): boolean, decide whether to store error rate and its corresponding average
            and upper variability limit for plots, default False
    Returns:
        tuple(Union[DataFrame, None]): a tuple, where first df is used for warning log table,
            and the second df - for plots (could be also None)
    """
    # fill in all string comnstants with relevant information
    # based on variables, error_rate_over_average, error_rate_upper_variability, error_rate_upper_limit args
    error_rate_measure_definition = error_rate_measure_definition_canva.format(variables="&".join(variables))

    error_rate_cond_warn_over_average = error_rate_cond_warn_over_average_canva.format(
        variables="&".join(variables), X=error_rate_over_average
    )
    error_rate_cond_warn_upper_variability = error_rate_cond_warn_upper_variability_canva.format(
        variables="&".join(variables), SD=error_rate_upper_variability
    )
    error_rate_cond_warn_upper_limit = error_rate_cond_warn_upper_limit_canva.format(
        variables="&".join(variables), X=error_rate_upper_limit
    )
    # qws of error rate by date is calculated based on previous days
    if variables == [ColNames.date]:
        window = Window.orderBy(ColNames.date).rowsBetween(-lookback_period_in_days, -1)
    else:
        # qws of error rate by date and other colum(s) is calculated over all "data points" of the same date
        window = Window.partitionBy(ColNames.date)
    # calculate error rate, a.k.a daily_value
    df_qw = (
        df_freq_distribution.groupBy(*variables)
        .agg(
            psf.sum(ColNames.initial_frequency).alias("sum_init_freq"),
            psf.sum(ColNames.final_frequency).alias("sum_final_freq"),
        )
        .withColumn(
            ColNames.daily_value,
            (psf.col("sum_init_freq") - psf.col("sum_final_freq")) / psf.col("sum_init_freq") * 100,
        )
    )
    # using self.rate_common_qw funciton calculate three types of QWs
    #   -> OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT
    # return a tuple of (df_log, df_plots | None)
    qw_result = self.rate_common_qw(
        df_qw,
        window,
        error_rate_upper_variability,
        error_rate_over_average,
        error_rate_upper_limit,
        error_rate_measure_definition,
        error_rate_cond_warn_upper_variability,
        error_rate_cond_warn_over_average,
        error_rate_cond_warn_upper_limit,
        save_data_for_plots,
    )

    return qw_result

error_type_rate_qw(df_qa_by_column, df_freq_distribution, field_name, error_type, lookback_period_in_days, error_type_rate_over_average, error_type_rate_upper_variability, error_type_rate_upper_limit, error_type_rate_measure_definition_canva=f'{MeasureDefinitions.error_type_rate}', error_type_rate_cond_warn_over_average_canva=f'{Conditions.error_type_rate_over_average}-{Warnings.error_type_rate_over_average}', error_type_rate_cond_warn_upper_variability_canva=f'{Conditions.error_type_rate_upper_variability}-{Warnings.error_type_rate_upper_variability}', error_type_rate_cond_warn_upper_limit_canva=f'{Conditions.error_type_rate_upper_limit}-{Warnings.error_type_rate_upper_limit}')

Prepare data for error type rate calculation. First fill in different string canvas, then based on field name and error type calculate their corresponding error rate using formula: number of errors of this error_type&field_name combo / Total initial frequency *100 (BY DATE). Parse preprocessed input along with window (which is a lookback period) to self.rate_common_qw function which calculates three types of QWs: OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT

Parameters:

Name Type Description Default
df_qa_by_column DataFrame

df with qa by column data.

required
df_freq_distribution DataFrame

df with frequency data.

required
field_name str | None

config param, the name of column of which to check error_type.

required
error_type str

config param, the name of error type.

required
lookback_period_in_days int

number of days prior to date of intrest.

required
error_type_rate_over_average Union[int, float]

config param, specifies the upper limit over which daily value can not exceed its corresponding mean error rate.

required
error_type_rate_upper_variability Union[int, float]

config param, the number of SD to define the upper varibaility limit: mean_rate + SD*error_type_rate_upper_variability, which daily value can not exceed

required
error_type_rate_upper_limit Union[int, float]

absolute number which daily value can not exceed

required
error_type_rate_measure_definition_canva str

canva text to use for measure_definition column (see measure_definition.py)

f'{error_type_rate}'
error_type_rate_cond_warn_over_average_canva str

canva text to use for over_average cases of error_type_rate QWs (see conditions.py and warnings.py)

f'{error_type_rate_over_average}-{error_type_rate_over_average}'
error_type_rate_cond_warn_upper_variability_canva str

canva text to use for upper_variability cases of error_type_rate QWs (see conditions.py and warnings.py)

f'{error_type_rate_upper_variability}-{error_type_rate_upper_variability}'
error_type_rate_cond_warn_upper_limit_canva str

canva text to use for upper_limit cases of error_type_rate QWs (see conditions.py and warnings.py)

f'{error_type_rate_upper_limit}-{error_type_rate_upper_limit}'

Returns: tuple(Union[DataFrame, None]): a tuple, where first df is used for warning log table, and the second df - for plots, but since save_data_for_plots always False, output=None

Source code in multimno/components/quality/event_quality_warnings/event_quality_warnings.py
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
def error_type_rate_qw(
    self,
    df_qa_by_column: DataFrame,
    df_freq_distribution: DataFrame,
    field_name: Union[str, None],
    error_type: str,
    lookback_period_in_days: int,
    error_type_rate_over_average: Union[int, float],
    error_type_rate_upper_variability: Union[int, float],
    error_type_rate_upper_limit: Union[int, float],
    error_type_rate_measure_definition_canva: str = f"{MeasureDefinitions.error_type_rate}",
    error_type_rate_cond_warn_over_average_canva: str = f"{Conditions.error_type_rate_over_average}-{Warnings.error_type_rate_over_average}",
    error_type_rate_cond_warn_upper_variability_canva: str = f"{Conditions.error_type_rate_upper_variability}-{Warnings.error_type_rate_upper_variability}",
    error_type_rate_cond_warn_upper_limit_canva: str = f"{Conditions.error_type_rate_upper_limit}-{Warnings.error_type_rate_upper_limit}",
) -> Tuple[Union[DataFrame, None]]:
    """
    Prepare data for error type rate calculation. First fill in different string canvas, then based
        on field name and error type calculate their corresponding error rate using formula:
        number of errors of this error_type&field_name combo / Total initial frequency *100 (BY DATE).
        Parse preprocessed input along with window (which is a lookback period)
        to self.rate_common_qw function which calculates three types of QWs:
        OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT


    Args:
        df_qa_by_column (DataFrame): df with qa by column data.
        df_freq_distribution (DataFrame): df with frequency data.
        field_name (str | None): config param, the name of column of which to check error_type.
        error_type (str): config param, the name of error type.
        lookback_period_in_days (int): number of days prior to date of intrest.
        error_type_rate_over_average (Union[int, float]): config param, specifies the upper limit over which daily
            value can not exceed its corresponding mean error rate.
        error_type_rate_upper_variability (Union[int, float]): config param, the number of SD to define the upper
            varibaility limit: mean_rate + SD*error_type_rate_upper_variability, which daily value can not exceed
        error_type_rate_upper_limit (Union[int, float]): absolute number which daily value can not exceed
        error_type_rate_measure_definition_canva (str): canva text to use for measure_definition
            column (see measure_definition.py)
        error_type_rate_cond_warn_over_average_canva (str): canva text to use for over_average cases of
            error_type_rate QWs (see conditions.py and warnings.py)
        error_type_rate_cond_warn_upper_variability_canva (str): canva text to use for upper_variability
            cases of error_type_rate QWs (see conditions.py and warnings.py)
        error_type_rate_cond_warn_upper_limit_canva (str): canva text to use for upper_limit cases of
            error_type_rate QWs (see conditions.py and warnings.py)
    Returns:
        tuple(Union[DataFrame, None]): a tuple, where first df is used for warning log table,
            and the second df - for plots, but since save_data_for_plots always False, output=None
    """
    # based on error_type, error_type_rate_over_average, error_type_rate_upper_variability,
    #  error_type_rate_upper_limit
    # fill in string canvases
    colname_error_type, error_type_qw_name = self.dict_error_type_info[error_type]

    error_type_rate_measure_definition = error_type_rate_measure_definition_canva.format(
        error_type_name=error_type_qw_name, field_name=str(field_name)
    )

    error_type_rate_cond_warn_over_average = error_type_rate_cond_warn_over_average_canva.format(
        error_type_name=error_type_qw_name,
        field_name=str(field_name),
        X=error_type_rate_over_average,
    )
    error_type_rate_cond_warn_upper_variability = error_type_rate_cond_warn_upper_variability_canva.format(
        error_type_name=error_type_qw_name,
        field_name=str(field_name),
        SD=error_type_rate_upper_variability,
    )
    error_type_rate_cond_warn_upper_limit = error_type_rate_cond_warn_upper_limit_canva.format(
        error_type_name=error_type_qw_name,
        field_name=str(field_name),
        X=error_type_rate_upper_limit,
    )
    # for error_type that have more then one or applicable columns
    # filter df_qa_by_column by field_name and error_type
    if field_name is not None:
        df_qa_by_column = df_qa_by_column.filter(
            (psf.col(ColNames.variable) == field_name) & (psf.col(ColNames.type_of_error) == colname_error_type)
        ).select(ColNames.date, ColNames.value)
    else:
        # for error_types which technically do not belong specifically to one of event
        # columns filter only by error_type (e.g. no_location error_type)
        df_qa_by_column = df_qa_by_column.filter(psf.col(ColNames.type_of_error) == colname_error_type).select(
            ColNames.date, ColNames.value
        )
    # calculate total daily initial frequency
    df_freq_distribution = (
        df_freq_distribution.groupby(ColNames.date)
        .agg(psf.sum(ColNames.initial_frequency).alias("sum_init_freq"))
        .select(ColNames.date, "sum_init_freq")
    )
    # for each date combine two type of information number of errors and total daily initial frequency
    df_combined = df_qa_by_column.join(df_freq_distribution, on=ColNames.date, how="inner")
    # for each date calculate error_type_rate, a.k.a daily_value
    df_temp = df_combined.withColumn(
        ColNames.daily_value,
        (psf.col(ColNames.value) / psf.col("sum_init_freq")) * 100,
    )

    # qws will be caluclated based on previous days
    window = Window.orderBy(ColNames.date).rowsBetween(-lookback_period_in_days, -1)
    # using self.rate_common_qw funciton calculate three types of QWs ->
    # OVER_AVERAGE, UPPER_VARIABILITY, and ABS_UPPER_LIMIT
    # return a tuple of (df_log, None)
    qw_result = self.rate_common_qw(
        df_temp,
        window,
        error_type_rate_upper_variability,
        error_type_rate_over_average,
        error_type_rate_upper_limit,
        error_type_rate_measure_definition,
        error_type_rate_cond_warn_upper_variability,
        error_type_rate_cond_warn_over_average,
        error_type_rate_cond_warn_upper_limit,
    )
    return qw_result

rate_common_qw(df_temp, window, rate_upper_variability, rate_over_average, rate_upper_limit, measure_definition, cond_warn_upper_variability, cond_warn_over_average, cond_warn_upper_limit, save_data_for_plots=False)

Take input df with "daily_value" column, and calculates three types of QWs: OVER_AVERAGE - for each row first based on specified window take mean of values, and then check if daily_value exceeds mean by more than rate_over_average UPPER_VARIABILITY - for each row using already calculated mean compute upper variability limit which is mean + SD*rate_upper_variability, check if daily_value exceeds it ABS_UPPER_LIMIT - check if daily_value exceeds absolute number rate_upper_limit All three QWs depend on specified thresholds, if daily_value exceeds one of calculated values it will store cond-warn-condition_value information in array column, some rows may have several QWs. In the end array column is exploded and cond-warn-condition_value information is split into three corresponding columns. The function returns almost ready df for SilverEventDataSyntacticQualityWarningsLogTable DO, and based on save_data_for_plots arg returns either almost ready data for plots or None

Parameters:

Name Type Description Default
df_temp DataFrame

temprory data that must have daily_value column to be used in further QW calculations

required
window Window

a window within which perform aggregation

required
rate_upper_variability int | float

config param, specifies the upper limit over which daily value can not exceed its corresponding mean error rate

required
rate_over_average int | float

config param, the number of SD to define the upper varibaility limit: mean_rate + SD*error_rate_upper_variability, which daily value can not exceed

required
rate_upper_limit int | float

absolute number which daily value can not exceed

required
measure_definition str

canva text to use for measure_definition column (see measure_definition.py)

required
cond_warn_over_average str

canva text to use for over_average cases (see conditions.py and warnings.py)

required
cond_warn_upper_variability str

canva text to use for upper_variability cases (see conditions.py and warnings.py)

required
cond_warn_upper_limit str

canva text to use for upper_limit cases (see conditions.py and warnings.py)

required
save_data_for_plots bool

boolean, decide whether to store daily_value and its corresponding average and upper variability limit for plots. Defaults to False.

False

Returns: tuple(Union[DataFrame, None]): a tuple, where first df is used for warning log table, and the second df - for plots

Source code in multimno/components/quality/event_quality_warnings/event_quality_warnings.py
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
def rate_common_qw(
    self,
    df_temp: DataFrame,
    window: Window,
    rate_upper_variability: Union[int, float],
    rate_over_average: Union[int, float],
    rate_upper_limit: Union[int, float],
    measure_definition: str,
    cond_warn_upper_variability: str,
    cond_warn_over_average: str,
    cond_warn_upper_limit: str,
    save_data_for_plots: bool = False,
) -> Tuple[Union[DataFrame, None]]:
    """
    Take input df with "daily_value" column, and calculates three types of QWs:
    OVER_AVERAGE - for each row first based on specified window take mean of values, and then check if
        daily_value exceeds mean by more than rate_over_average
    UPPER_VARIABILITY - for each row using already calculated mean compute upper variability limit which is
        mean + SD*rate_upper_variability, check if  daily_value exceeds it
    ABS_UPPER_LIMIT - check if daily_value exceeds absolute number rate_upper_limit
    All three QWs depend on specified thresholds, if daily_value exceeds one of calculated values it will
        store cond-warn-condition_value information in array column,
    some rows may have several QWs. In the end array column is exploded and cond-warn-condition_value
        information is split into three corresponding columns.
    The function returns almost ready df for SilverEventDataSyntacticQualityWarningsLogTable DO, and based
        on save_data_for_plots arg returns either almost ready data for plots or None

    Args:
        df_temp (DataFrame): temprory data that must have daily_value column to
            be used in further QW calculations
        window (Window): a window within which perform aggregation
        rate_upper_variability (int|float): config param, specifies the upper limit over which daily value
             can not exceed its corresponding mean error rate
        rate_over_average (int|float): config param, the number of SD to define the upper varibaility
             limit: mean_rate + SD*error_rate_upper_variability, which daily value can not exceed
        rate_upper_limit (int|float): absolute number which daily value can not exceed
        measure_definition (str): canva text to use for measure_definition column (see measure_definition.py)
        cond_warn_over_average (str): canva text to use for over_average cases (see conditions.py and warnings.py)
        cond_warn_upper_variability (str): canva text to use for
            upper_variability cases (see conditions.py and warnings.py)
        cond_warn_upper_limit (str): canva text to use for upper_limit cases (see conditions.py and warnings.py)
        save_data_for_plots (bool): boolean, decide whether to store daily_value and its corresponding average
            and upper variability limit for plots. Defaults to False.
    Returns:
         tuple(Union[DataFrame, None]): a tuple, where first df is used for
            warning log table, and the second df - for plots
    """
    # prepare data
    # calculate mean and std over window, and based on them UCL - for UPPER_VARIABILITY check
    # ratio_perc - for OVER_AVERAGE check
    # create empty array cond_warn_condition_value column to store inromation about qws
    df_prep = df_temp.withColumns(
        {
            ColNames.average: psf.avg(ColNames.daily_value).over(window),
            "std_rate": psf.stddev(ColNames.daily_value).over(window),
            "ratio_perc": (psf.col(ColNames.daily_value) / psf.col(ColNames.average)) * 100,
            ColNames.UCL: psf.col(ColNames.average) + rate_upper_variability * psf.col("std_rate"),
            "cond_warn_condition_value": psf.array(),
        }
    )
    # if save_data_for_plots=True, add some new columns with constant values
    # and select only applicable to SilverEventDataSyntacticQualityWarningsForPlots.SCHEMA
    # else - return None
    if save_data_for_plots:
        df_prep = df_prep.cache()
        df_plots = df_prep.withColumns(
            {
                ColNames.lookback_period: psf.lit(self.lookback_period),
                ColNames.type_of_qw: psf.lit("error_rate"),
                ColNames.LCL: psf.lit(None).cast("float"),
            }
        ).select(
            self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsForPlots.ID].SCHEMA.fieldNames()
        )
    else:
        df_plots = None

    # continue with QWs checks
    # first filter temp data by period from [data_period_start-lookback_period_in_days, data_period_end]
    #  to [data_period_start, data_period_end]
    # filter is aaplied after plot block because the first days of research period needs previous data to plot
    df_qw = df_prep.filter(psf.col(ColNames.date) >= self.data_period_start)
    # perform UPPER_VARIABILITY, OVER_AVERAGE, and ABS_UPPER_LIMIT checks
    # if condition is met store information about condition-warning_text-condition_value as a string into
    # array column 'cond_warn_condition_value'
    df_qw = (
        df_qw.withColumn(
            "cond_warn_condition_value",
            psf.when(
                psf.col(ColNames.daily_value) > psf.col(ColNames.UCL),
                psf.array_append(
                    psf.col("cond_warn_condition_value"),
                    psf.concat(
                        psf.lit(f"{cond_warn_upper_variability}-"),
                        psf.col(ColNames.UCL).cast("string"),
                    ),
                ),
            ).otherwise(psf.col("cond_warn_condition_value")),
        )
        .withColumn(
            "cond_warn_condition_value",
            psf.when(
                psf.col("ratio_perc") > psf.lit(100 + rate_over_average),
                psf.array_append(
                    psf.col("cond_warn_condition_value"),
                    psf.lit(f"{cond_warn_over_average}-{str(rate_over_average)}"),
                ),
            ).otherwise(psf.col("cond_warn_condition_value")),
        )
        .withColumn(
            "cond_warn_condition_value",
            psf.when(
                psf.col(ColNames.daily_value) > psf.lit(rate_upper_limit),
                psf.array_append(
                    psf.col("cond_warn_condition_value"),
                    psf.lit(f"{cond_warn_upper_limit}-{str(rate_upper_limit)}"),
                ),
            ).otherwise(psf.col("cond_warn_condition_value")),
        )
    )
    # explode array column 'cond_warn_condition_value'
    df_qw = df_qw.withColumn(
        "cond_warn_condition_value",
        psf.explode(psf.col("cond_warn_condition_value")),
    )
    # add some column constants
    # split "cond_warn_condition_value" column into three: condition, wanring_text, conditon_value to match
    # SilverEventDataSyntacticQualityWarningsLogTable.SCHEMA
    # select only needed columns
    df_qw = df_qw.withColumns(
        {
            ColNames.lookback_period: psf.lit(self.lookback_period),
            ColNames.measure_definition: psf.lit(measure_definition),
            ColNames.condition: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(0),
            ColNames.warning_text: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(1),
            ColNames.condition_value: psf.split(psf.col("cond_warn_condition_value"), "-").getItem(2).cast("float"),
        }
    ).select(self.output_qw_data_objects[SilverEventDataSyntacticQualityWarningsLogTable.ID].SCHEMA.fieldNames())

    return (df_qw, df_plots)

save_quality_warnings_output(dfs_qw, output_do)

Concatenates all elements in dfs_qw list, adjustes to schema of output_do, and using write method of output_do stores the result

Parameters:

Name Type Description Default
dfs_qw list

description

required
output_do SilverEventDataSyntacticQualityWarningsLogTable | SilverEventDataSyntacticQualityWarningsForPlots

description

required
Source code in multimno/components/quality/event_quality_warnings/event_quality_warnings.py
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
def save_quality_warnings_output(
    self,
    dfs_qw: List[Union[DataFrame, None]],
    output_do: Union[
        SilverEventDataSyntacticQualityWarningsLogTable, SilverEventDataSyntacticQualityWarningsForPlots
    ],
):
    """
    Concatenates all elements in dfs_qw list, adjustes to schema of output_do, and using write
        method of output_do stores the result

    Args:
        dfs_qw (list): _description_
        output_do (SilverEventDataSyntacticQualityWarningsLogTable | \
            SilverEventDataSyntacticQualityWarningsForPlots): _description_
    """

    output_do.df = reduce(lambda x, y: x.union(y), dfs_qw)

    output_do.df = self.spark.createDataFrame(output_do.df.rdd, output_do.SCHEMA)

    output_do.write()