Skip to content

continuous_time_segmentation

Module that implements the Continuous Time Segmentations functionality

ContinuousTimeSegmentation

Bases: Component

A class to aggregate events into time segments.

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
class ContinuousTimeSegmentation(Component):
    """
    A class to aggregate events into time segments.
    """

    COMPONENT_ID = "ContinuousTimeSegmentation"

    def __init__(self, general_config_path: str, component_config_path: str) -> None:
        super().__init__(general_config_path, component_config_path)

        self.data_period_start = datetime.strptime(
            self.config.get(self.COMPONENT_ID, "data_period_start"), "%Y-%m-%d"
        ).date()
        self.data_period_end = datetime.strptime(
            self.config.get(self.COMPONENT_ID, "data_period_end"), "%Y-%m-%d"
        ).date()

        self.min_time_stay = timedelta(seconds=self.config.getint(self.COMPONENT_ID, "min_time_stay_s"))
        self.max_time_missing_stay = timedelta(seconds=self.config.getint(self.COMPONENT_ID, "max_time_missing_stay_s"))
        self.max_time_missing_move = timedelta(seconds=self.config.getint(self.COMPONENT_ID, "max_time_missing_move_s"))
        self.max_time_missing_abroad = timedelta(
            seconds=self.config.getint(self.COMPONENT_ID, "max_time_missing_abroad_s")
        )
        self.pad_time = timedelta(seconds=self.config.getint(self.COMPONENT_ID, "pad_time_s"))

        self.event_error_flags_to_include = self.config.geteval(self.COMPONENT_ID, "event_error_flags_to_include")
        self.local_mcc = self.config.getint(self.COMPONENT_ID, "local_mcc")
        # this is for UDF
        self.segmentation_return_schema = StructType(
            [
                StructField(ColNames.start_timestamp, TimestampType()),
                StructField(ColNames.end_timestamp, TimestampType()),
                StructField(ColNames.cells, ArrayType(StringType())),
                StructField(ColNames.state, ByteType()),
                StructField(ColNames.is_last, BooleanType()),
                StructField(ColNames.time_segment_id, StringType()),
                StructField(ColNames.user_id, StringType()),
                StructField(ColNames.mcc, ShortType()),
                StructField(ColNames.mnc, StringType()),
                StructField(ColNames.plmn, IntegerType()),
                StructField(ColNames.user_id_modulo, IntegerType()),
            ]
        )

        self.data_period_dates = [
            self.data_period_start + timedelta(days=i)
            for i in range((self.data_period_end - self.data_period_start).days + 1)
        ]
        self.last_time_segments = None
        self.current_date = None

    def initalize_data_objects(self):

        # Input
        self.input_data_objects = {}
        self.is_first_run = self.config.getboolean(self.COMPONENT_ID, "is_first_run")

        inputs = {
            "event_data_silver_flagged": SilverEventFlaggedDataObject,
            "cell_intersection_groups_data_silver": SilverCellIntersectionGroupsDataObject,
        }
        if not self.is_first_run:
            inputs["time_segments_silver"] = SilverTimeSegmentsDataObject

        for key, value in inputs.items():
            path = self.config.get(CONFIG_SILVER_PATHS_KEY, key)
            if check_if_data_path_exists(self.spark, path):
                self.input_data_objects[value.ID] = value(self.spark, path)
            else:
                self.logger.warning(f"Expected path {path} to exist but it does not")
                raise ValueError(f"Invalid path for {value.ID}: {path}")

        # Output
        self.output_data_objects = {}
        self.output_silver_time_segments_path = self.config.get(CONFIG_SILVER_PATHS_KEY, "time_segments_silver")
        self.output_data_objects[SilverTimeSegmentsDataObject.ID] = SilverTimeSegmentsDataObject(
            self.spark,
            self.output_silver_time_segments_path,
        )

        # Output clearing
        clear_destination_directory = self.config.getboolean(
            self.COMPONENT_ID, "clear_destination_directory", fallback=False
        )
        if clear_destination_directory:
            delete_file_or_folder(self.spark, self.output_silver_time_segments_path)
        # TODO add optional date-limited deletion when not first run,
        # but consider that segments get generated for an additional one day before the starting date

    @get_execution_stats
    def execute(self):
        self.logger.info(f"Starting {self.COMPONENT_ID}...")

        # for every date in the data period, get the events and the intersection groups
        # for that date and calculate the time segments
        for current_date in self.data_period_dates:
            self.logger.info(f"Processing events for {current_date.strftime('%Y-%m-%d')}")
            self.current_date = current_date
            self.read()

            self.current_input_events_sdf = self.input_data_objects[SilverEventFlaggedDataObject.ID].df.filter(
                (F.make_date(F.col(ColNames.year), F.col(ColNames.month), F.col(ColNames.day)) == F.lit(current_date))
                & (F.col(ColNames.error_flag).isin(self.event_error_flags_to_include))
            )

            self.current_interesection_groups_sdf = (
                self.input_data_objects[SilverCellIntersectionGroupsDataObject.ID]
                .df.filter(
                    (
                        F.make_date(
                            F.col(ColNames.year),
                            F.col(ColNames.month),
                            F.col(ColNames.day),
                        )
                        == F.lit(current_date)
                    )
                )
                .select(ColNames.cell_id, ColNames.overlapping_cell_ids, ColNames.year, ColNames.month, ColNames.day)
            )

            # If segements was already calculated and this is continuation of the previous run
            # we need to get the last time segment for each user.
            # If this is the first run, we will create an empty dataframe
            if not self.is_first_run:

                previous_date = current_date - timedelta(days=1)
                self.last_time_segments = self.input_data_objects[SilverTimeSegmentsDataObject.ID].df.filter(
                    (
                        F.make_date(F.col(ColNames.year), F.col(ColNames.month), F.col(ColNames.day))
                        == F.lit(previous_date)
                    )
                    & (F.col(ColNames.is_last) == True)
                )

            self.transform()
            self.write()
            self.input_data_objects[SilverTimeSegmentsDataObject.ID] = self.output_data_objects[
                SilverTimeSegmentsDataObject.ID
            ]
            self.is_first_run = False

        self.logger.info(f"Finished {self.COMPONENT_ID}")

    def transform(self):
        self.logger.info(f"Transform method {self.COMPONENT_ID}")

        current_events_sdf = self.current_input_events_sdf
        last_time_segments_sdf = self.last_time_segments
        intersections_groups_df = self.current_interesection_groups_sdf

        # Add overlapping_cell_ids list to each event
        current_events_sdf = (
            current_events_sdf.alias("df1")
            .join(
                intersections_groups_df.alias("df2"),
                on=[ColNames.year, ColNames.month, ColNames.day, ColNames.cell_id],
                how="left",
            )
            .select(
                f"df1.{ColNames.user_id}",
                f"df1.{ColNames.timestamp}",
                f"df1.{ColNames.mcc}",
                f"df1.{ColNames.mnc}",
                f"df1.{ColNames.plmn}",
                f"df1.{ColNames.cell_id}",
                f"df1.{ColNames.user_id_modulo}",
                ColNames.overlapping_cell_ids,
            )
        )

        if last_time_segments_sdf is None:
            current_events_sdf = (
                current_events_sdf.withColumn(ColNames.end_timestamp, F.lit(None).cast(TimestampType()))
                .withColumn(ColNames.cells, F.lit(None))
                .withColumn(ColNames.state, F.lit(None))
                .withColumn("segment_plmn", F.lit(None))
            )
        else:
            last_time_segments_sdf = last_time_segments_sdf.select(
                ColNames.end_timestamp,
                ColNames.cells,
                ColNames.state,
                ColNames.user_id,
                F.col(ColNames.mcc).alias("segment_mcc"),
                F.col(ColNames.mnc).alias("segment_mnc"),
                F.col(ColNames.plmn).alias("segment_plmn"),
                ColNames.user_id_modulo,
            )
            current_events_sdf = current_events_sdf.join(
                F.broadcast(last_time_segments_sdf),
                on=[ColNames.user_id_modulo, ColNames.user_id],
                how="outer",
            )

            current_events_sdf = (
                current_events_sdf.withColumn(
                    ColNames.mcc, F.coalesce(F.col(ColNames.mcc), F.col("segment_mcc"))
                ).withColumn(ColNames.mnc, F.coalesce(F.col(ColNames.mnc), F.col("segment_mnc")))
            ).drop("segment_mcc", "segment_mnc")

        # TODO add first event(s?) from next date to current events to handle last segment of date

        # TODO: This conversion is needed for Pandas serialisation/deserialisation,
        # to remove it when user_id will be stored as string, not as binary
        current_events_sdf = current_events_sdf.withColumn(ColNames.user_id, F.hex(F.col(ColNames.user_id)))

        current_events_sdf = current_events_sdf.withColumn(
            "is_abroad_event",
            (F.col(ColNames.plmn).isNotNull()) & (F.col(ColNames.plmn).substr(1, 3) != F.lit(self.local_mcc)),
        )

        # Partial function to pass the current date and other parameters to the aggregation function
        aggregate_segments_partial = partial(
            self.aggregate_segments,
            current_date=self.current_date,
            min_time_stay=self.min_time_stay,
            max_time_missing_stay=self.max_time_missing_stay,
            max_time_missing_move=self.max_time_missing_move,
            max_time_missing_abroad=self.max_time_missing_abroad,
            pad_time=self.pad_time,
        )

        # TODO: To test this approach with large datasets, might not be feasible
        current_segments_sdf = current_events_sdf.groupby(ColNames.user_id_modulo, ColNames.user_id).applyInPandas(
            aggregate_segments_partial, self.segmentation_return_schema
        )

        current_segments_sdf = current_segments_sdf.withColumns(
            {
                ColNames.year: F.year(ColNames.start_timestamp).cast("smallint"),
                ColNames.month: F.month(ColNames.start_timestamp).cast("tinyint"),
                ColNames.day: F.dayofmonth(ColNames.start_timestamp).cast("tinyint"),
            }
        )

        # TODO: This conversion is needed to get back to binary after Pandas serialisation/deserialisation,
        # to remove it when user_id will be stored as string, not as binary
        current_segments_sdf = current_segments_sdf.withColumn(ColNames.user_id, F.unhex(F.col(ColNames.user_id)))

        current_segments_sdf = apply_schema_casting(current_segments_sdf, SilverTimeSegmentsDataObject.SCHEMA)
        current_segments_sdf = current_segments_sdf.repartition(
            *SilverTimeSegmentsDataObject.PARTITION_COLUMNS
        ).sortWithinPartitions(ColNames.user_id, ColNames.start_timestamp)

        self.output_data_objects[SilverTimeSegmentsDataObject.ID].df = current_segments_sdf

    @staticmethod
    def aggregate_segments(
        pdf: pd.DataFrame,
        current_date: date,
        min_time_stay: timedelta,
        max_time_missing_stay: timedelta,
        max_time_missing_move: timedelta,
        max_time_missing_abroad: timedelta,
        pad_time: timedelta,
    ) -> pd.DataFrame:
        """Aggregates user stays into continuous time segments based on given parameters.
        This function processes user location data and creates continuous time segments,
        taking into account various time-based parameters to determine segment boundaries and types.
        Args:
            pdf: DataFrame containing user location events.
            current_date: Date for which to generate segments.
            min_time_stay: Minimum duration required to consider a period as a stay.
            max_time_missing_stay: Maximum allowed gap in data while maintaining a stay segment.
            max_time_missing_move: Maximum allowed gap in data while maintaining a move segment.
            max_time_missing_abroad: Maximum allowed gap in data for abroad segments.
            pad_time: Time padding to add around segments.
        Returns:
            DataFrame containing aggregated time segments.
        """
        user_id, user_mod, mcc, mnc = ContinuousTimeSegmentation._get_user_metadata(pdf)

        # Prepare date boundaries
        current_date_start = datetime.combine(current_date, time(0, 0, 0))
        current_date_end = datetime.combine(current_date, time(23, 59, 59))

        # Check if there are any events for this date
        no_events_for_current_date = pdf[ColNames.timestamp].isna().all()
        no_previous_segments = pdf[ColNames.end_timestamp].isna().all()

        if no_events_for_current_date:
            # If no events, create a single UNKNOWN segment
            segments = ContinuousTimeSegmentation._handle_no_events_for_current_date(
                pdf, no_previous_segments, user_id, current_date_start, current_date_end, max_time_missing_abroad
            )
        else:
            # Create the initial time segment for this day
            current_ts = ContinuousTimeSegmentation._create_initial_time_segment(
                pdf,
                no_previous_segments,
                current_date_start,
                pad_time,
                user_id,
                max_time_missing_stay,
                max_time_missing_move,
                max_time_missing_abroad,
            )

            # Limit columns we actually need
            pdf_for_events = pdf[
                [ColNames.timestamp, ColNames.cell_id, ColNames.overlapping_cell_ids, "is_abroad_event", ColNames.plmn]
            ]

            # Build segments from each event
            segments = ContinuousTimeSegmentation._iterate_events(
                pdf_for_events,
                current_ts,
                user_id,
                min_time_stay,
                max_time_missing_stay,
                max_time_missing_move,
                max_time_missing_abroad,
                pad_time,
            )

        # Convert list of segments to DataFrame
        segments_df = pd.DataFrame(segments)
        segments_df[ColNames.user_id] = user_id
        segments_df[ColNames.mcc] = mcc
        segments_df[ColNames.mnc] = mnc
        segments_df[ColNames.user_id_modulo] = user_mod

        return segments_df

    # ---------------------  No-Events Helper  ---------------------
    @staticmethod
    def _handle_no_events_for_current_date(
        pdf: pd.DataFrame,
        no_previous_segments: bool,
        user_id: str,
        day_start: datetime,
        day_end: datetime,
        max_time_missing_abroad: timedelta,
    ) -> List[Dict]:
        """Handles cases where there are no events for the current date.
        This method creates a time segment for a day without events. If there were previous segments
        and the last segment was abroad within the maximum allowed time gap, it continues the abroad state.
        Otherwise, it creates an unknown state segment.
        Args:
            pdf (pd.DataFrame): DataFrame containing previous segments information
            no_previous_segments (bool): Flag indicating if there are previous segments
            user_id (str): Identifier for the user
            day_start (datetime): Start timestamp of the day
            day_end (datetime): End timestamp of the day
            max_time_missing_abroad (timedelta): Maximum allowed time gap for continuing abroad state
        Returns:
            List[Dict]: List containing a single time segment dictionary with the appropriate state
        """
        if not no_previous_segments:
            previous_segment_end_time = pdf[ColNames.end_timestamp].iloc[0]
            previous_segment_state = pdf[ColNames.state].iloc[0]
            previous_segment_plmn = pdf["segment_plmn"].iloc[0]

            if (previous_segment_state == SegmentStates.ABROAD) and (
                day_end - previous_segment_end_time <= max_time_missing_abroad
            ):
                seg = ContinuousTimeSegmentation._create_time_segment(
                    day_start, day_end, [], previous_segment_plmn, SegmentStates.ABROAD, user_id
                )
            else:
                seg = ContinuousTimeSegmentation._create_time_segment(
                    day_start, day_end, [], None, SegmentStates.UNKNOWN, user_id
                )
        else:
            seg = ContinuousTimeSegmentation._create_time_segment(
                day_start, day_end, [], None, SegmentStates.UNKNOWN, user_id
            )

        seg[ColNames.is_last] = True
        return [seg]

    # ---------------------  Initial Segment Helper  ---------------------
    @staticmethod
    def _create_initial_time_segment(
        pdf: pd.DataFrame,
        no_previous_segments: bool,
        day_start: datetime,
        pad_time: timedelta,
        user_id: str,
        max_time_missing_stay: timedelta,
        max_time_missing_move: timedelta,
        max_time_missing_abroad: timedelta,
    ) -> Dict:
        """Create initial time segment based on first event and previous day information.
        Creates a time segment from the start of the day until the first event of the day,
        considering any existing segments from the previous day to maintain continuity.
        Args:
            pdf: DataFrame containing the first event data
            no_previous_segments: Boolean indicating if there are segments from previous day
            day_start: DateTime marking the start of the current day
            pad_time: TimeDelta for padding unknown segments
            user_id: String identifier for the user
            max_time_missing_stay: Maximum allowed gap for stay segments
            max_time_missing_move: Maximum allowed gap for move segments
            max_time_missing_abroad: Maximum allowed gap for abroad segments
        Returns:
            Dict containing the created time segment
        """
        first_event_time = pdf[ColNames.timestamp].iloc[0]
        previous_segment_end_time = pdf[ColNames.end_timestamp].iloc[0]
        previous_segment_state = pdf[ColNames.state].iloc[0]
        previous_segment_plmn = pdf["segment_plmn"].iloc[0]
        previous_segment_cells = pdf[ColNames.cells].iloc[0]

        time_to_first_event = first_event_time - day_start
        adjusted_pad = min(pad_time, time_to_first_event / 2)

        if no_previous_segments:
            # No segment from previous day => unknown until first event
            return ContinuousTimeSegmentation._create_time_segment(
                day_start,
                first_event_time - adjusted_pad,
                [],
                None,
                SegmentStates.UNKNOWN,
                user_id,
            )

        # Otherwise, try to continue from the previous day
        gap = first_event_time - previous_segment_end_time

        if (previous_segment_state == SegmentStates.STAY) and (gap <= max_time_missing_stay):
            return ContinuousTimeSegmentation._create_time_segment(
                day_start,
                first_event_time,
                previous_segment_cells,
                previous_segment_plmn,
                SegmentStates.STAY,
                user_id,
            )
        elif (previous_segment_state == SegmentStates.MOVE) and (gap <= max_time_missing_move):
            return ContinuousTimeSegmentation._create_time_segment(
                day_start,
                first_event_time,
                previous_segment_cells,
                previous_segment_plmn,
                SegmentStates.MOVE,
                user_id,
            )
        elif (previous_segment_state == SegmentStates.ABROAD) and (gap <= max_time_missing_abroad):
            return ContinuousTimeSegmentation._create_time_segment(
                day_start,
                first_event_time,
                [],
                previous_segment_plmn,
                SegmentStates.ABROAD,
                user_id,
            )
        else:
            # Large gap or incompatible => unknown until first event
            return ContinuousTimeSegmentation._create_time_segment(
                day_start,
                first_event_time - adjusted_pad,
                [],
                None,
                SegmentStates.UNKNOWN,
                user_id,
            )

    # ---------------------  Iteration Over Events ---------------------
    @staticmethod
    def _iterate_events(
        pdf_events: pd.DataFrame,
        current_ts: Dict,
        user_id: str,
        min_time_stay: timedelta,
        max_time_missing_stay: timedelta,
        max_time_missing_move: timedelta,
        max_time_missing_abroad: timedelta,
        pad_time: timedelta,
    ) -> List[Dict]:
        """Iterates through events and constructs time segments based on continuous time segmentation rules.

        Processes a sequence of events (both abroad and local) and creates time segments according to
        specified time constraints. Each event updates the current time segment state and may generate
        new segments when conditions are met.

        Args:
            pdf_events: DataFrame containing events with timestamp, location, and other relevant information.
            current_ts: Dictionary representing the current time segment state.
            user_id: String identifier for the user.
            min_time_stay: Minimum duration required for a stay segment.
            max_time_missing_stay: Maximum allowed gap in stay segments.
            max_time_missing_move: Maximum allowed gap in movement segments.
            max_time_missing_abroad: Maximum allowed gap in abroad segments.
            pad_time: Time padding added to segments.

        Returns:
            List of dictionaries representing time segments
        """
        all_segments: List[Dict] = []

        for event in pdf_events.itertuples(index=False):
            if event.is_abroad_event:
                # Process abroad logic
                new_segments, new_current = ContinuousTimeSegmentation._process_abroad_event(
                    current_ts,
                    user_id,
                    event.timestamp,
                    event.plmn,
                    max_time_missing_abroad,
                )
            else:
                # Process local logic
                new_segments, new_current = ContinuousTimeSegmentation._process_local_event(
                    current_ts,
                    user_id,
                    event.timestamp,
                    event.cell_id,
                    event.overlapping_cell_ids,
                    event.plmn,
                    min_time_stay,
                    max_time_missing_stay,
                    max_time_missing_move,
                    pad_time,
                )

            all_segments.extend(new_segments)
            current_ts = new_current

        # Mark final segment as is_last
        current_ts[ColNames.is_last] = True
        all_segments.append(current_ts)
        return all_segments

    @staticmethod
    def _extend_segment(current_ts: Dict, new_end_time: datetime, new_cells: List[Any] = None) -> Dict:
        """
        Returns a brand new segment dictionary with an extended_ts end_time
        and optionally merged cells. Does not mutate the original.
        """
        updated_ts = current_ts.copy()
        updated_ts[ColNames.end_timestamp] = new_end_time

        if new_cells is not None:
            merged_cells = list(set(updated_ts[ColNames.cells] + new_cells))
            updated_ts[ColNames.cells] = merged_cells

        return updated_ts

    # ---------------------  Processing Each Event ---------------------
    @staticmethod
    def _process_abroad_event(
        current_ts: Dict,
        user_id: str,
        event_timestamp: datetime,
        event_plmn: str,
        max_time_missing_abroad: timedelta,
    ) -> Tuple[List[Dict], Dict]:
        """
        Decide whether to extend current ABROAD segment, create a new one,
        or start bridging with UNKNOWN if the gap is too large.
        Returns (finalized_segments, new_current_ts).
        """
        segments_to_add: List[Dict] = []

        abroad_mcc = str(event_plmn)[:3]
        current_mcc = str(current_ts.get(ColNames.plmn) or "")[:3]
        is_mcc_matched = abroad_mcc == current_mcc

        gap = event_timestamp - current_ts[ColNames.end_timestamp]

        if current_ts[ColNames.state] != SegmentStates.ABROAD:
            # Transition from a different state to ABROAD
            segments_to_add.append(current_ts)
            current_ts = ContinuousTimeSegmentation._create_time_segment(
                current_ts[ColNames.end_timestamp],
                event_timestamp,
                [],
                event_plmn,
                SegmentStates.ABROAD,
                user_id,
            )

        elif is_mcc_matched and (gap <= max_time_missing_abroad):
            # Extend existing ABROAD
            current_ts = ContinuousTimeSegmentation._extend_segment(current_ts, event_timestamp)

        elif (not is_mcc_matched) and (gap <= max_time_missing_abroad):
            # Different MCC but within the gap => new ABROAD segment
            segments_to_add.append(current_ts)
            current_ts = ContinuousTimeSegmentation._create_time_segment(
                current_ts[ColNames.end_timestamp],
                event_timestamp,
                [],
                event_plmn,
                SegmentStates.ABROAD,
                user_id,
            )

        else:
            # Gap too large => bridging with UNKNOWN
            segments_to_add.append(current_ts)
            current_ts = ContinuousTimeSegmentation._create_time_segment(
                current_ts[ColNames.end_timestamp],
                event_timestamp,
                [],
                event_plmn,
                SegmentStates.UNKNOWN,
                user_id,
            )

        return segments_to_add, current_ts

    @staticmethod
    def _process_local_event(
        current_ts: Dict,
        user_id: str,
        event_timestamp: datetime,
        event_cell: Any,
        overlapping_cell_ids: Any,
        event_plmn: Any,
        min_time_stay: timedelta,
        max_time_missing_stay: timedelta,
        max_time_missing_move: timedelta,
        pad_time: timedelta,
    ) -> Tuple[List[Dict], Dict]:
        """
        Decide whether to continue a STAY/UNDETERMINED, transition to MOVE,
        or insert UNKNOWN bridging based on the local event.
        Returns (finalized_segments, updated_current_ts).
        """
        segments_to_add: List[Dict] = []
        gap = event_timestamp - current_ts[ColNames.end_timestamp]
        if overlapping_cell_ids is None:
            overlapping_cell_ids = []
        new_cells = overlapping_cell_ids.tolist()
        new_cells.append(event_cell)

        is_intersected = ContinuousTimeSegmentation._check_intersection(
            current_ts[ColNames.cells],
            new_cells,
        )

        # Case 1: UNKNOWN/ABROAD => UNDETERMINED transition
        if current_ts[ColNames.state] in [SegmentStates.UNKNOWN, SegmentStates.ABROAD]:
            segments_to_add.append(current_ts)
            current_ts = ContinuousTimeSegmentation._create_time_segment(
                current_ts[ColNames.end_timestamp],
                event_timestamp,
                [event_cell],
                event_plmn,
                SegmentStates.UNDETERMINED,
                user_id,
            )

        # Case 2: Intersection => STAY or UNDETERMINED extension
        elif is_intersected and (gap <= max_time_missing_stay):
            if current_ts[ColNames.state] in [SegmentStates.UNDETERMINED, SegmentStates.STAY]:
                # Extend in place
                current_ts = ContinuousTimeSegmentation._extend_segment(current_ts, event_timestamp, [event_cell])
                duration = current_ts[ColNames.end_timestamp] - current_ts[ColNames.start_timestamp]
                if duration > min_time_stay:
                    current_ts[ColNames.state] = SegmentStates.STAY

            elif current_ts[ColNames.state] == SegmentStates.MOVE:
                # End MOVE => start UNDETERMINED
                segments_to_add.append(current_ts)
                current_ts = ContinuousTimeSegmentation._create_time_segment(
                    current_ts[ColNames.end_timestamp],
                    event_timestamp,
                    [event_cell],
                    event_plmn,
                    SegmentStates.UNDETERMINED,
                    user_id,
                )
        # Case 3: No intersection but gap <= max_time_missing_move => 'move'
        elif (not is_intersected) and (gap <= max_time_missing_move):

            midpoint = current_ts[ColNames.end_timestamp] + gap / 2
            move_ts_1 = ContinuousTimeSegmentation._create_time_segment(
                current_ts[ColNames.end_timestamp],
                midpoint,
                current_ts[ColNames.cells],
                event_plmn,
                SegmentStates.MOVE,
                user_id,
            )
            segments_to_add.extend([current_ts, move_ts_1])

            current_ts = ContinuousTimeSegmentation._create_time_segment(
                midpoint,
                event_timestamp,
                [event_cell],
                event_plmn,
                SegmentStates.MOVE,
                user_id,
            )

        # Case 4: Gap too large => bridging with UNKNOWN
        else:
            # First, artificially extend current_ts by pad_time
            extended_ts = ContinuousTimeSegmentation._extend_segment(
                current_ts, current_ts[ColNames.end_timestamp] + pad_time
            )

            unknown_segment = ContinuousTimeSegmentation._create_time_segment(
                extended_ts[ColNames.end_timestamp],
                event_timestamp - pad_time,
                [],
                event_plmn,
                SegmentStates.UNKNOWN,
                user_id,
            )

            segments_to_add.extend([extended_ts, unknown_segment])

            current_ts = ContinuousTimeSegmentation._create_time_segment(
                event_timestamp - pad_time,
                event_timestamp,
                [event_cell],
                event_plmn,
                SegmentStates.UNDETERMINED,
                user_id,
            )

        return segments_to_add, current_ts

    @staticmethod
    def _create_time_segment(
        start_timestamp: datetime,
        end_timestamp: datetime,
        cells: List[str],
        plmn: int,
        state: str,
        user_id: str,
    ) -> Dict:
        """
        Creates a new time segment.

        It creates a new time segment with these values, incrementing the segment ID by 1
        if a previous segment ID is provided, or setting it to 1 if not.

        Parameters:
        start_timestamp (datetime): The start timestamp of the time segment.
        end_timestamp (datetime): The end timestamp of the time segment.
        cells (List[str]): The cells of the time segment.
        state (str): The state of the time segment.
        previous_segment_id (Optional[int]): The ID of the previous time segment, if any.

        Returns:
        Dict: The new time segment.
        """
        segment_id_string = f"{user_id}{start_timestamp}"
        return {
            ColNames.time_segment_id: hashlib.md5(segment_id_string.encode()).hexdigest(),
            ColNames.start_timestamp: start_timestamp,
            ColNames.end_timestamp: end_timestamp,
            ColNames.cells: cells,
            ColNames.plmn: plmn,
            ColNames.state: state,
            ColNames.is_last: False,
        }

    @staticmethod
    def _get_user_metadata(pdf: pdDataFrame) -> Tuple[str, int, str]:
        """
        Gets user_id, user_id_modulo, mcc, mnc from Pandas DataFrame containing columns with the corresponding names.
        Values from the first row of the dataframe are used.

        Args:
            pdf (pdDataFrame): Pandas DataFrame

        Returns:
            Tuple[str, int, str]: user_id, user_id_modulo, mcc, mnc
        """
        user_id = pdf[ColNames.user_id][0]
        user_id_mod = pdf[ColNames.user_id_modulo][0]
        mcc = pdf[ColNames.mcc][0]
        mnc = pdf[ColNames.mnc][0]
        return user_id, user_id_mod, mcc, mnc

    @staticmethod
    def _check_intersection(
        previous_ts_cells: List[str],
        current_event_overlapping_cell_ids: List[str],
    ) -> bool:
        """
        Checks if there is an intersection between the existing time segment and the current event.

        This method takes two lists of cells, one for the cells included in the existing time segment and the other for
        the overlapping cell ids of the current event's cell.
        The time segment intersects with the event if each of the time segment's cells are included in the event's overlapping cell ids list.

        A segment with no cells cannot intersect and returns False.

        Parameters:
        previous_ts_cells (List[str]): The cells of the existing time segment.
        current_event_overlapping_cell_ids (List[str]): Cells the current event's cell overlaps with, including itself.

        Returns:
        bool: True if there is an intersection, False otherwise.
        """
        if len(previous_ts_cells) == 0:
            is_intersected = False
        else:
            is_intersected = set(previous_ts_cells).issubset(set(current_event_overlapping_cell_ids))
        return is_intersected

_check_intersection(previous_ts_cells, current_event_overlapping_cell_ids) staticmethod

Checks if there is an intersection between the existing time segment and the current event.

This method takes two lists of cells, one for the cells included in the existing time segment and the other for the overlapping cell ids of the current event's cell. The time segment intersects with the event if each of the time segment's cells are included in the event's overlapping cell ids list.

A segment with no cells cannot intersect and returns False.

Parameters: previous_ts_cells (List[str]): The cells of the existing time segment. current_event_overlapping_cell_ids (List[str]): Cells the current event's cell overlaps with, including itself.

Returns: bool: True if there is an intersection, False otherwise.

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
@staticmethod
def _check_intersection(
    previous_ts_cells: List[str],
    current_event_overlapping_cell_ids: List[str],
) -> bool:
    """
    Checks if there is an intersection between the existing time segment and the current event.

    This method takes two lists of cells, one for the cells included in the existing time segment and the other for
    the overlapping cell ids of the current event's cell.
    The time segment intersects with the event if each of the time segment's cells are included in the event's overlapping cell ids list.

    A segment with no cells cannot intersect and returns False.

    Parameters:
    previous_ts_cells (List[str]): The cells of the existing time segment.
    current_event_overlapping_cell_ids (List[str]): Cells the current event's cell overlaps with, including itself.

    Returns:
    bool: True if there is an intersection, False otherwise.
    """
    if len(previous_ts_cells) == 0:
        is_intersected = False
    else:
        is_intersected = set(previous_ts_cells).issubset(set(current_event_overlapping_cell_ids))
    return is_intersected

_create_initial_time_segment(pdf, no_previous_segments, day_start, pad_time, user_id, max_time_missing_stay, max_time_missing_move, max_time_missing_abroad) staticmethod

Create initial time segment based on first event and previous day information. Creates a time segment from the start of the day until the first event of the day, considering any existing segments from the previous day to maintain continuity. Args: pdf: DataFrame containing the first event data no_previous_segments: Boolean indicating if there are segments from previous day day_start: DateTime marking the start of the current day pad_time: TimeDelta for padding unknown segments user_id: String identifier for the user max_time_missing_stay: Maximum allowed gap for stay segments max_time_missing_move: Maximum allowed gap for move segments max_time_missing_abroad: Maximum allowed gap for abroad segments Returns: Dict containing the created time segment

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
@staticmethod
def _create_initial_time_segment(
    pdf: pd.DataFrame,
    no_previous_segments: bool,
    day_start: datetime,
    pad_time: timedelta,
    user_id: str,
    max_time_missing_stay: timedelta,
    max_time_missing_move: timedelta,
    max_time_missing_abroad: timedelta,
) -> Dict:
    """Create initial time segment based on first event and previous day information.
    Creates a time segment from the start of the day until the first event of the day,
    considering any existing segments from the previous day to maintain continuity.
    Args:
        pdf: DataFrame containing the first event data
        no_previous_segments: Boolean indicating if there are segments from previous day
        day_start: DateTime marking the start of the current day
        pad_time: TimeDelta for padding unknown segments
        user_id: String identifier for the user
        max_time_missing_stay: Maximum allowed gap for stay segments
        max_time_missing_move: Maximum allowed gap for move segments
        max_time_missing_abroad: Maximum allowed gap for abroad segments
    Returns:
        Dict containing the created time segment
    """
    first_event_time = pdf[ColNames.timestamp].iloc[0]
    previous_segment_end_time = pdf[ColNames.end_timestamp].iloc[0]
    previous_segment_state = pdf[ColNames.state].iloc[0]
    previous_segment_plmn = pdf["segment_plmn"].iloc[0]
    previous_segment_cells = pdf[ColNames.cells].iloc[0]

    time_to_first_event = first_event_time - day_start
    adjusted_pad = min(pad_time, time_to_first_event / 2)

    if no_previous_segments:
        # No segment from previous day => unknown until first event
        return ContinuousTimeSegmentation._create_time_segment(
            day_start,
            first_event_time - adjusted_pad,
            [],
            None,
            SegmentStates.UNKNOWN,
            user_id,
        )

    # Otherwise, try to continue from the previous day
    gap = first_event_time - previous_segment_end_time

    if (previous_segment_state == SegmentStates.STAY) and (gap <= max_time_missing_stay):
        return ContinuousTimeSegmentation._create_time_segment(
            day_start,
            first_event_time,
            previous_segment_cells,
            previous_segment_plmn,
            SegmentStates.STAY,
            user_id,
        )
    elif (previous_segment_state == SegmentStates.MOVE) and (gap <= max_time_missing_move):
        return ContinuousTimeSegmentation._create_time_segment(
            day_start,
            first_event_time,
            previous_segment_cells,
            previous_segment_plmn,
            SegmentStates.MOVE,
            user_id,
        )
    elif (previous_segment_state == SegmentStates.ABROAD) and (gap <= max_time_missing_abroad):
        return ContinuousTimeSegmentation._create_time_segment(
            day_start,
            first_event_time,
            [],
            previous_segment_plmn,
            SegmentStates.ABROAD,
            user_id,
        )
    else:
        # Large gap or incompatible => unknown until first event
        return ContinuousTimeSegmentation._create_time_segment(
            day_start,
            first_event_time - adjusted_pad,
            [],
            None,
            SegmentStates.UNKNOWN,
            user_id,
        )

_create_time_segment(start_timestamp, end_timestamp, cells, plmn, state, user_id) staticmethod

Creates a new time segment.

It creates a new time segment with these values, incrementing the segment ID by 1 if a previous segment ID is provided, or setting it to 1 if not.

Parameters: start_timestamp (datetime): The start timestamp of the time segment. end_timestamp (datetime): The end timestamp of the time segment. cells (List[str]): The cells of the time segment. state (str): The state of the time segment. previous_segment_id (Optional[int]): The ID of the previous time segment, if any.

Returns: Dict: The new time segment.

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
@staticmethod
def _create_time_segment(
    start_timestamp: datetime,
    end_timestamp: datetime,
    cells: List[str],
    plmn: int,
    state: str,
    user_id: str,
) -> Dict:
    """
    Creates a new time segment.

    It creates a new time segment with these values, incrementing the segment ID by 1
    if a previous segment ID is provided, or setting it to 1 if not.

    Parameters:
    start_timestamp (datetime): The start timestamp of the time segment.
    end_timestamp (datetime): The end timestamp of the time segment.
    cells (List[str]): The cells of the time segment.
    state (str): The state of the time segment.
    previous_segment_id (Optional[int]): The ID of the previous time segment, if any.

    Returns:
    Dict: The new time segment.
    """
    segment_id_string = f"{user_id}{start_timestamp}"
    return {
        ColNames.time_segment_id: hashlib.md5(segment_id_string.encode()).hexdigest(),
        ColNames.start_timestamp: start_timestamp,
        ColNames.end_timestamp: end_timestamp,
        ColNames.cells: cells,
        ColNames.plmn: plmn,
        ColNames.state: state,
        ColNames.is_last: False,
    }

_extend_segment(current_ts, new_end_time, new_cells=None) staticmethod

Returns a brand new segment dictionary with an extended_ts end_time and optionally merged cells. Does not mutate the original.

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
568
569
570
571
572
573
574
575
576
577
578
579
580
581
@staticmethod
def _extend_segment(current_ts: Dict, new_end_time: datetime, new_cells: List[Any] = None) -> Dict:
    """
    Returns a brand new segment dictionary with an extended_ts end_time
    and optionally merged cells. Does not mutate the original.
    """
    updated_ts = current_ts.copy()
    updated_ts[ColNames.end_timestamp] = new_end_time

    if new_cells is not None:
        merged_cells = list(set(updated_ts[ColNames.cells] + new_cells))
        updated_ts[ColNames.cells] = merged_cells

    return updated_ts

_get_user_metadata(pdf) staticmethod

Gets user_id, user_id_modulo, mcc, mnc from Pandas DataFrame containing columns with the corresponding names. Values from the first row of the dataframe are used.

Parameters:

Name Type Description Default
pdf DataFrame

Pandas DataFrame

required

Returns:

Type Description
Tuple[str, int, str]

Tuple[str, int, str]: user_id, user_id_modulo, mcc, mnc

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
@staticmethod
def _get_user_metadata(pdf: pdDataFrame) -> Tuple[str, int, str]:
    """
    Gets user_id, user_id_modulo, mcc, mnc from Pandas DataFrame containing columns with the corresponding names.
    Values from the first row of the dataframe are used.

    Args:
        pdf (pdDataFrame): Pandas DataFrame

    Returns:
        Tuple[str, int, str]: user_id, user_id_modulo, mcc, mnc
    """
    user_id = pdf[ColNames.user_id][0]
    user_id_mod = pdf[ColNames.user_id_modulo][0]
    mcc = pdf[ColNames.mcc][0]
    mnc = pdf[ColNames.mnc][0]
    return user_id, user_id_mod, mcc, mnc

_handle_no_events_for_current_date(pdf, no_previous_segments, user_id, day_start, day_end, max_time_missing_abroad) staticmethod

Handles cases where there are no events for the current date. This method creates a time segment for a day without events. If there were previous segments and the last segment was abroad within the maximum allowed time gap, it continues the abroad state. Otherwise, it creates an unknown state segment. Args: pdf (pd.DataFrame): DataFrame containing previous segments information no_previous_segments (bool): Flag indicating if there are previous segments user_id (str): Identifier for the user day_start (datetime): Start timestamp of the day day_end (datetime): End timestamp of the day max_time_missing_abroad (timedelta): Maximum allowed time gap for continuing abroad state Returns: List[Dict]: List containing a single time segment dictionary with the appropriate state

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
@staticmethod
def _handle_no_events_for_current_date(
    pdf: pd.DataFrame,
    no_previous_segments: bool,
    user_id: str,
    day_start: datetime,
    day_end: datetime,
    max_time_missing_abroad: timedelta,
) -> List[Dict]:
    """Handles cases where there are no events for the current date.
    This method creates a time segment for a day without events. If there were previous segments
    and the last segment was abroad within the maximum allowed time gap, it continues the abroad state.
    Otherwise, it creates an unknown state segment.
    Args:
        pdf (pd.DataFrame): DataFrame containing previous segments information
        no_previous_segments (bool): Flag indicating if there are previous segments
        user_id (str): Identifier for the user
        day_start (datetime): Start timestamp of the day
        day_end (datetime): End timestamp of the day
        max_time_missing_abroad (timedelta): Maximum allowed time gap for continuing abroad state
    Returns:
        List[Dict]: List containing a single time segment dictionary with the appropriate state
    """
    if not no_previous_segments:
        previous_segment_end_time = pdf[ColNames.end_timestamp].iloc[0]
        previous_segment_state = pdf[ColNames.state].iloc[0]
        previous_segment_plmn = pdf["segment_plmn"].iloc[0]

        if (previous_segment_state == SegmentStates.ABROAD) and (
            day_end - previous_segment_end_time <= max_time_missing_abroad
        ):
            seg = ContinuousTimeSegmentation._create_time_segment(
                day_start, day_end, [], previous_segment_plmn, SegmentStates.ABROAD, user_id
            )
        else:
            seg = ContinuousTimeSegmentation._create_time_segment(
                day_start, day_end, [], None, SegmentStates.UNKNOWN, user_id
            )
    else:
        seg = ContinuousTimeSegmentation._create_time_segment(
            day_start, day_end, [], None, SegmentStates.UNKNOWN, user_id
        )

    seg[ColNames.is_last] = True
    return [seg]

_iterate_events(pdf_events, current_ts, user_id, min_time_stay, max_time_missing_stay, max_time_missing_move, max_time_missing_abroad, pad_time) staticmethod

Iterates through events and constructs time segments based on continuous time segmentation rules.

Processes a sequence of events (both abroad and local) and creates time segments according to specified time constraints. Each event updates the current time segment state and may generate new segments when conditions are met.

Parameters:

Name Type Description Default
pdf_events DataFrame

DataFrame containing events with timestamp, location, and other relevant information.

required
current_ts Dict

Dictionary representing the current time segment state.

required
user_id str

String identifier for the user.

required
min_time_stay timedelta

Minimum duration required for a stay segment.

required
max_time_missing_stay timedelta

Maximum allowed gap in stay segments.

required
max_time_missing_move timedelta

Maximum allowed gap in movement segments.

required
max_time_missing_abroad timedelta

Maximum allowed gap in abroad segments.

required
pad_time timedelta

Time padding added to segments.

required

Returns:

Type Description
List[Dict]

List of dictionaries representing time segments

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
@staticmethod
def _iterate_events(
    pdf_events: pd.DataFrame,
    current_ts: Dict,
    user_id: str,
    min_time_stay: timedelta,
    max_time_missing_stay: timedelta,
    max_time_missing_move: timedelta,
    max_time_missing_abroad: timedelta,
    pad_time: timedelta,
) -> List[Dict]:
    """Iterates through events and constructs time segments based on continuous time segmentation rules.

    Processes a sequence of events (both abroad and local) and creates time segments according to
    specified time constraints. Each event updates the current time segment state and may generate
    new segments when conditions are met.

    Args:
        pdf_events: DataFrame containing events with timestamp, location, and other relevant information.
        current_ts: Dictionary representing the current time segment state.
        user_id: String identifier for the user.
        min_time_stay: Minimum duration required for a stay segment.
        max_time_missing_stay: Maximum allowed gap in stay segments.
        max_time_missing_move: Maximum allowed gap in movement segments.
        max_time_missing_abroad: Maximum allowed gap in abroad segments.
        pad_time: Time padding added to segments.

    Returns:
        List of dictionaries representing time segments
    """
    all_segments: List[Dict] = []

    for event in pdf_events.itertuples(index=False):
        if event.is_abroad_event:
            # Process abroad logic
            new_segments, new_current = ContinuousTimeSegmentation._process_abroad_event(
                current_ts,
                user_id,
                event.timestamp,
                event.plmn,
                max_time_missing_abroad,
            )
        else:
            # Process local logic
            new_segments, new_current = ContinuousTimeSegmentation._process_local_event(
                current_ts,
                user_id,
                event.timestamp,
                event.cell_id,
                event.overlapping_cell_ids,
                event.plmn,
                min_time_stay,
                max_time_missing_stay,
                max_time_missing_move,
                pad_time,
            )

        all_segments.extend(new_segments)
        current_ts = new_current

    # Mark final segment as is_last
    current_ts[ColNames.is_last] = True
    all_segments.append(current_ts)
    return all_segments

_process_abroad_event(current_ts, user_id, event_timestamp, event_plmn, max_time_missing_abroad) staticmethod

Decide whether to extend current ABROAD segment, create a new one, or start bridging with UNKNOWN if the gap is too large. Returns (finalized_segments, new_current_ts).

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
@staticmethod
def _process_abroad_event(
    current_ts: Dict,
    user_id: str,
    event_timestamp: datetime,
    event_plmn: str,
    max_time_missing_abroad: timedelta,
) -> Tuple[List[Dict], Dict]:
    """
    Decide whether to extend current ABROAD segment, create a new one,
    or start bridging with UNKNOWN if the gap is too large.
    Returns (finalized_segments, new_current_ts).
    """
    segments_to_add: List[Dict] = []

    abroad_mcc = str(event_plmn)[:3]
    current_mcc = str(current_ts.get(ColNames.plmn) or "")[:3]
    is_mcc_matched = abroad_mcc == current_mcc

    gap = event_timestamp - current_ts[ColNames.end_timestamp]

    if current_ts[ColNames.state] != SegmentStates.ABROAD:
        # Transition from a different state to ABROAD
        segments_to_add.append(current_ts)
        current_ts = ContinuousTimeSegmentation._create_time_segment(
            current_ts[ColNames.end_timestamp],
            event_timestamp,
            [],
            event_plmn,
            SegmentStates.ABROAD,
            user_id,
        )

    elif is_mcc_matched and (gap <= max_time_missing_abroad):
        # Extend existing ABROAD
        current_ts = ContinuousTimeSegmentation._extend_segment(current_ts, event_timestamp)

    elif (not is_mcc_matched) and (gap <= max_time_missing_abroad):
        # Different MCC but within the gap => new ABROAD segment
        segments_to_add.append(current_ts)
        current_ts = ContinuousTimeSegmentation._create_time_segment(
            current_ts[ColNames.end_timestamp],
            event_timestamp,
            [],
            event_plmn,
            SegmentStates.ABROAD,
            user_id,
        )

    else:
        # Gap too large => bridging with UNKNOWN
        segments_to_add.append(current_ts)
        current_ts = ContinuousTimeSegmentation._create_time_segment(
            current_ts[ColNames.end_timestamp],
            event_timestamp,
            [],
            event_plmn,
            SegmentStates.UNKNOWN,
            user_id,
        )

    return segments_to_add, current_ts

_process_local_event(current_ts, user_id, event_timestamp, event_cell, overlapping_cell_ids, event_plmn, min_time_stay, max_time_missing_stay, max_time_missing_move, pad_time) staticmethod

Decide whether to continue a STAY/UNDETERMINED, transition to MOVE, or insert UNKNOWN bridging based on the local event. Returns (finalized_segments, updated_current_ts).

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
@staticmethod
def _process_local_event(
    current_ts: Dict,
    user_id: str,
    event_timestamp: datetime,
    event_cell: Any,
    overlapping_cell_ids: Any,
    event_plmn: Any,
    min_time_stay: timedelta,
    max_time_missing_stay: timedelta,
    max_time_missing_move: timedelta,
    pad_time: timedelta,
) -> Tuple[List[Dict], Dict]:
    """
    Decide whether to continue a STAY/UNDETERMINED, transition to MOVE,
    or insert UNKNOWN bridging based on the local event.
    Returns (finalized_segments, updated_current_ts).
    """
    segments_to_add: List[Dict] = []
    gap = event_timestamp - current_ts[ColNames.end_timestamp]
    if overlapping_cell_ids is None:
        overlapping_cell_ids = []
    new_cells = overlapping_cell_ids.tolist()
    new_cells.append(event_cell)

    is_intersected = ContinuousTimeSegmentation._check_intersection(
        current_ts[ColNames.cells],
        new_cells,
    )

    # Case 1: UNKNOWN/ABROAD => UNDETERMINED transition
    if current_ts[ColNames.state] in [SegmentStates.UNKNOWN, SegmentStates.ABROAD]:
        segments_to_add.append(current_ts)
        current_ts = ContinuousTimeSegmentation._create_time_segment(
            current_ts[ColNames.end_timestamp],
            event_timestamp,
            [event_cell],
            event_plmn,
            SegmentStates.UNDETERMINED,
            user_id,
        )

    # Case 2: Intersection => STAY or UNDETERMINED extension
    elif is_intersected and (gap <= max_time_missing_stay):
        if current_ts[ColNames.state] in [SegmentStates.UNDETERMINED, SegmentStates.STAY]:
            # Extend in place
            current_ts = ContinuousTimeSegmentation._extend_segment(current_ts, event_timestamp, [event_cell])
            duration = current_ts[ColNames.end_timestamp] - current_ts[ColNames.start_timestamp]
            if duration > min_time_stay:
                current_ts[ColNames.state] = SegmentStates.STAY

        elif current_ts[ColNames.state] == SegmentStates.MOVE:
            # End MOVE => start UNDETERMINED
            segments_to_add.append(current_ts)
            current_ts = ContinuousTimeSegmentation._create_time_segment(
                current_ts[ColNames.end_timestamp],
                event_timestamp,
                [event_cell],
                event_plmn,
                SegmentStates.UNDETERMINED,
                user_id,
            )
    # Case 3: No intersection but gap <= max_time_missing_move => 'move'
    elif (not is_intersected) and (gap <= max_time_missing_move):

        midpoint = current_ts[ColNames.end_timestamp] + gap / 2
        move_ts_1 = ContinuousTimeSegmentation._create_time_segment(
            current_ts[ColNames.end_timestamp],
            midpoint,
            current_ts[ColNames.cells],
            event_plmn,
            SegmentStates.MOVE,
            user_id,
        )
        segments_to_add.extend([current_ts, move_ts_1])

        current_ts = ContinuousTimeSegmentation._create_time_segment(
            midpoint,
            event_timestamp,
            [event_cell],
            event_plmn,
            SegmentStates.MOVE,
            user_id,
        )

    # Case 4: Gap too large => bridging with UNKNOWN
    else:
        # First, artificially extend current_ts by pad_time
        extended_ts = ContinuousTimeSegmentation._extend_segment(
            current_ts, current_ts[ColNames.end_timestamp] + pad_time
        )

        unknown_segment = ContinuousTimeSegmentation._create_time_segment(
            extended_ts[ColNames.end_timestamp],
            event_timestamp - pad_time,
            [],
            event_plmn,
            SegmentStates.UNKNOWN,
            user_id,
        )

        segments_to_add.extend([extended_ts, unknown_segment])

        current_ts = ContinuousTimeSegmentation._create_time_segment(
            event_timestamp - pad_time,
            event_timestamp,
            [event_cell],
            event_plmn,
            SegmentStates.UNDETERMINED,
            user_id,
        )

    return segments_to_add, current_ts

aggregate_segments(pdf, current_date, min_time_stay, max_time_missing_stay, max_time_missing_move, max_time_missing_abroad, pad_time) staticmethod

Aggregates user stays into continuous time segments based on given parameters. This function processes user location data and creates continuous time segments, taking into account various time-based parameters to determine segment boundaries and types. Args: pdf: DataFrame containing user location events. current_date: Date for which to generate segments. min_time_stay: Minimum duration required to consider a period as a stay. max_time_missing_stay: Maximum allowed gap in data while maintaining a stay segment. max_time_missing_move: Maximum allowed gap in data while maintaining a move segment. max_time_missing_abroad: Maximum allowed gap in data for abroad segments. pad_time: Time padding to add around segments. Returns: DataFrame containing aggregated time segments.

Source code in multimno/components/execution/time_segments/continuous_time_segmentation.py
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
@staticmethod
def aggregate_segments(
    pdf: pd.DataFrame,
    current_date: date,
    min_time_stay: timedelta,
    max_time_missing_stay: timedelta,
    max_time_missing_move: timedelta,
    max_time_missing_abroad: timedelta,
    pad_time: timedelta,
) -> pd.DataFrame:
    """Aggregates user stays into continuous time segments based on given parameters.
    This function processes user location data and creates continuous time segments,
    taking into account various time-based parameters to determine segment boundaries and types.
    Args:
        pdf: DataFrame containing user location events.
        current_date: Date for which to generate segments.
        min_time_stay: Minimum duration required to consider a period as a stay.
        max_time_missing_stay: Maximum allowed gap in data while maintaining a stay segment.
        max_time_missing_move: Maximum allowed gap in data while maintaining a move segment.
        max_time_missing_abroad: Maximum allowed gap in data for abroad segments.
        pad_time: Time padding to add around segments.
    Returns:
        DataFrame containing aggregated time segments.
    """
    user_id, user_mod, mcc, mnc = ContinuousTimeSegmentation._get_user_metadata(pdf)

    # Prepare date boundaries
    current_date_start = datetime.combine(current_date, time(0, 0, 0))
    current_date_end = datetime.combine(current_date, time(23, 59, 59))

    # Check if there are any events for this date
    no_events_for_current_date = pdf[ColNames.timestamp].isna().all()
    no_previous_segments = pdf[ColNames.end_timestamp].isna().all()

    if no_events_for_current_date:
        # If no events, create a single UNKNOWN segment
        segments = ContinuousTimeSegmentation._handle_no_events_for_current_date(
            pdf, no_previous_segments, user_id, current_date_start, current_date_end, max_time_missing_abroad
        )
    else:
        # Create the initial time segment for this day
        current_ts = ContinuousTimeSegmentation._create_initial_time_segment(
            pdf,
            no_previous_segments,
            current_date_start,
            pad_time,
            user_id,
            max_time_missing_stay,
            max_time_missing_move,
            max_time_missing_abroad,
        )

        # Limit columns we actually need
        pdf_for_events = pdf[
            [ColNames.timestamp, ColNames.cell_id, ColNames.overlapping_cell_ids, "is_abroad_event", ColNames.plmn]
        ]

        # Build segments from each event
        segments = ContinuousTimeSegmentation._iterate_events(
            pdf_for_events,
            current_ts,
            user_id,
            min_time_stay,
            max_time_missing_stay,
            max_time_missing_move,
            max_time_missing_abroad,
            pad_time,
        )

    # Convert list of segments to DataFrame
    segments_df = pd.DataFrame(segments)
    segments_df[ColNames.user_id] = user_id
    segments_df[ColNames.mcc] = mcc
    segments_df[ColNames.mnc] = mnc
    segments_df[ColNames.user_id_modulo] = user_mod

    return segments_df