Skip to content

component

Module that defines the abstract pipeline component class

Component

Class that models a pipeline component.

Source code in multimno/core/component.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class Component(metaclass=ABCMeta):
    """
    Class that models a pipeline component.
    """

    COMPONENT_ID: str = None

    def __init__(self, general_config_path: str, component_config_path: str) -> None:
        self.input_data_objects: Dict[str, DataObject] = None
        self.output_data_objects: Dict[str, DataObject] = None

        # Initialize variables
        self.config: ConfigParser = parse_configuration(general_config_path, component_config_path)
        self.logger: Logger = generate_logger(self.config, self.COMPONENT_ID)
        self.spark: SparkSession = generate_spark_session(self.config)
        self.initalize_data_objects()

        # Log configuration
        self.log_config()

    @abstractmethod
    def initalize_data_objects(self):
        """
        Method that initializes the data objects associated with the component.
        """

    def read(self):
        """
        Method that performs the read operation of the input data objects of the component.
        """
        for data_object in self.input_data_objects.values():
            data_object.read()

    @abstractmethod
    def transform(self):
        """
        Method that performs the data transformations needed to set the dataframes of the output
         data objects from the input data objects.
        """

    def write(self):
        """
        Method that performs the write operation of the output data objects.
        """
        for data_object in self.output_data_objects.values():
            data_object.write()

    @get_execution_stats
    def execute(self):
        """
        Method that performs the read, transform and write methods of the component.
        """
        self.logger.info(f"Starting {self.COMPONENT_ID}...")
        self.read()
        self.transform()
        self.write()
        self.logger.info(f"Finished {self.COMPONENT_ID}")

    def log_config(self):
        """
        Method that logs all sections and key-value pairs of a ConfigParser object.
        """
        # Validation
        if self.config is None or self.logger is None:
            return

        # Log each section in order
        for section in self.config.sections():
            self.logger.info(f"[{section}]")
            for key, value in self.config.items(section):
                self.logger.info(f"{key}: {value}")
            # Break line for each section
            self.logger.info("")

execute()

Method that performs the read, transform and write methods of the component.

Source code in multimno/core/component.py
65
66
67
68
69
70
71
72
73
74
@get_execution_stats
def execute(self):
    """
    Method that performs the read, transform and write methods of the component.
    """
    self.logger.info(f"Starting {self.COMPONENT_ID}...")
    self.read()
    self.transform()
    self.write()
    self.logger.info(f"Finished {self.COMPONENT_ID}")

initalize_data_objects() abstractmethod

Method that initializes the data objects associated with the component.

Source code in multimno/core/component.py
38
39
40
41
42
@abstractmethod
def initalize_data_objects(self):
    """
    Method that initializes the data objects associated with the component.
    """

log_config()

Method that logs all sections and key-value pairs of a ConfigParser object.

Source code in multimno/core/component.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def log_config(self):
    """
    Method that logs all sections and key-value pairs of a ConfigParser object.
    """
    # Validation
    if self.config is None or self.logger is None:
        return

    # Log each section in order
    for section in self.config.sections():
        self.logger.info(f"[{section}]")
        for key, value in self.config.items(section):
            self.logger.info(f"{key}: {value}")
        # Break line for each section
        self.logger.info("")

read()

Method that performs the read operation of the input data objects of the component.

Source code in multimno/core/component.py
44
45
46
47
48
49
def read(self):
    """
    Method that performs the read operation of the input data objects of the component.
    """
    for data_object in self.input_data_objects.values():
        data_object.read()

transform() abstractmethod

Method that performs the data transformations needed to set the dataframes of the output data objects from the input data objects.

Source code in multimno/core/component.py
51
52
53
54
55
56
@abstractmethod
def transform(self):
    """
    Method that performs the data transformations needed to set the dataframes of the output
     data objects from the input data objects.
    """

write()

Method that performs the write operation of the output data objects.

Source code in multimno/core/component.py
58
59
60
61
62
63
def write(self):
    """
    Method that performs the write operation of the output data objects.
    """
    for data_object in self.output_data_objects.values():
        data_object.write()