Skip to content

data_object

Module that defines the data object abstract classes

DataObject

Abstract class that models a DataObject. It defines its data schema including the attributes that compose it.

Source code in multimno/core/data_objects/data_object.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class DataObject(metaclass=ABCMeta):
    """
    Abstract class that models a DataObject. It defines its data schema including the attributes that compose it.
    """

    ID: str = None
    SCHEMA: StructType = None

    def __init__(self, spark: SparkSession) -> None:
        self.df: DataFrame = None
        self.spark: SparkSession = spark
        self.interface: IOInterface = None

    def read(self, *args, **kwargs):
        """
        Method that performs the read operation of the data object dataframe through an IOInterface.
        """
        self.df = self.interface.read_from_interface(*args, **kwargs)
        return self

    def write(self, *args, **kwargs):
        """
        Method that performs the write operation of the data object dataframe through an IOInterface.
        """
        self.interface.write_from_interface(self.df, *args, **kwargs)

    def cast_to_schema(self):
        columns = {field.name: F.col(field.name).cast(field.dataType) for field in self.SCHEMA.fields}
        self.df = self.df.withColumns(columns)

read(*args, **kwargs)

Method that performs the read operation of the data object dataframe through an IOInterface.

Source code in multimno/core/data_objects/data_object.py
32
33
34
35
36
37
def read(self, *args, **kwargs):
    """
    Method that performs the read operation of the data object dataframe through an IOInterface.
    """
    self.df = self.interface.read_from_interface(*args, **kwargs)
    return self

write(*args, **kwargs)

Method that performs the write operation of the data object dataframe through an IOInterface.

Source code in multimno/core/data_objects/data_object.py
39
40
41
42
43
def write(self, *args, **kwargs):
    """
    Method that performs the write operation of the data object dataframe through an IOInterface.
    """
    self.interface.write_from_interface(self.df, *args, **kwargs)

GeoParquetDataObject

Bases: PathDataObject

Class that models a DataObject that will use a ParquetInterface for IO operations. It inherits the PathDataObject abstract class.

Source code in multimno/core/data_objects/data_object.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
class GeoParquetDataObject(PathDataObject):
    """
    Class that models a DataObject that will use a ParquetInterface for IO operations.
    It inherits the PathDataObject abstract class.
    """

    def __init__(
        self,
        spark: SparkSession,
        default_path: str,
        default_partition_columns: List[str] = None,
        default_mode: str = SPARK_WRITING_MODES.APPEND,
        default_crs: int = INSPIRE_GRID_EPSG,
        set_crs: bool = True,
    ) -> None:
        super().__init__(spark, default_path, default_partition_columns, default_mode)
        self.interface: PathInterface = GeoParquetInterface()
        self.default_crs = default_crs
        self.set_crs = set_crs

    def read(self):
        self.df = self.interface.read_from_interface(self.spark, self.default_path, self.SCHEMA)

        if self.set_crs:
            self.df = self.df.withColumn(
                ColNames.geometry, STF.ST_SetSRID((ColNames.geometry), F.lit(self.default_crs))
            )

        return self

ParquetDataObject

Bases: PathDataObject

Class that models a DataObject that will use a ParquetInterface for IO operations. It inherits the PathDataObject abstract class.

Source code in multimno/core/data_objects/data_object.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class ParquetDataObject(PathDataObject):
    """
    Class that models a DataObject that will use a ParquetInterface for IO operations.
    It inherits the PathDataObject abstract class.
    """

    def __init__(
        self,
        spark: SparkSession,
        default_path: str,
        default_partition_columns: List[str] = None,
        default_mode: str = SPARK_WRITING_MODES.APPEND,
    ) -> None:
        super().__init__(spark, default_path, default_partition_columns, default_mode)
        self.interface: PathInterface = ParquetInterface()

PathDataObject

Bases: DataObject

Abstract Class that models DataObjects that will use a PathInterface for IO operations. It inherits the DataObject abstract class.

Source code in multimno/core/data_objects/data_object.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class PathDataObject(DataObject, metaclass=ABCMeta):
    """Abstract Class that models DataObjects that will use a PathInterface for IO operations.
    It inherits the DataObject abstract class.
    """

    ID = ...
    SCHEMA = ...
    PARTITION_COLUMNS = ...

    def __init__(
        self,
        spark: SparkSession,
        default_path: str,
        default_partition_columns: List[str] = None,
        default_mode: str = SPARK_WRITING_MODES.APPEND,
    ) -> None:
        super().__init__(spark)
        self.interface: PathInterface = None
        self.default_path: str = default_path
        if default_partition_columns is None:
            default_partition_columns = self.PARTITION_COLUMNS
        self.default_partition_columns: List[str] = default_partition_columns
        self.default_mode: str = default_mode

    def read(self, *args, path: str = None, **kwargs):
        if path is None:
            path = self.default_path
        self.df = self.interface.read_from_interface(self.spark, path, self.SCHEMA)

        return self

    def write(self, *args, path: str = None, partition_columns: list[str] = None, mode: str = None, **kwargs):
        if path is None:
            path = self.default_path
        if partition_columns is None:
            partition_columns = self.default_partition_columns
        if mode is None:
            mode = self.default_mode

        self.interface.write_from_interface(self.df, path=path, partition_columns=partition_columns, mode=mode)

    def get_size(self) -> int:
        """
        Returns the size of the data object in bytes.
        """
        files = self.df.inputFiles()

        if len(files) == 0:
            return 0

        conf = self.spark._jsc.hadoopConfiguration()
        # need to get proper URI prefix for the file system
        uri = self.spark._jvm.java.net.URI.create(files[0])
        fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
        total_size = 0

        for file in files:
            total_size += fs.getFileStatus(self.spark._jvm.org.apache.hadoop.fs.Path(file)).getLen()

        return total_size

    def get_num_files(self) -> int:
        """
        Returns the number of files of the data object.
        """
        return len(self.df.inputFiles())

    def get_top_rows(self, n: int, truncate: int = 20) -> str:
        """
        Returns string with top n rows. Same as df.show.
        """
        return self.df._jdf.showString(n, truncate, False)

get_num_files()

Returns the number of files of the data object.

Source code in multimno/core/data_objects/data_object.py
111
112
113
114
115
def get_num_files(self) -> int:
    """
    Returns the number of files of the data object.
    """
    return len(self.df.inputFiles())

get_size()

Returns the size of the data object in bytes.

Source code in multimno/core/data_objects/data_object.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def get_size(self) -> int:
    """
    Returns the size of the data object in bytes.
    """
    files = self.df.inputFiles()

    if len(files) == 0:
        return 0

    conf = self.spark._jsc.hadoopConfiguration()
    # need to get proper URI prefix for the file system
    uri = self.spark._jvm.java.net.URI.create(files[0])
    fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
    total_size = 0

    for file in files:
        total_size += fs.getFileStatus(self.spark._jvm.org.apache.hadoop.fs.Path(file)).getLen()

    return total_size

get_top_rows(n, truncate=20)

Returns string with top n rows. Same as df.show.

Source code in multimno/core/data_objects/data_object.py
117
118
119
120
121
def get_top_rows(self, n: int, truncate: int = 20) -> str:
    """
    Returns string with top n rows. Same as df.show.
    """
    return self.df._jdf.showString(n, truncate, False)