Skip to content

data_object

Module that defines the data object abstract classes

DataObject

Abstract class that models a DataObject. It defines its data schema including the attributes that compose it.

Source code in multimno/core/data_objects/data_object.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class DataObject(metaclass=ABCMeta):
    """
    Abstract class that models a DataObject. It defines its data schema including the attributes that compose it.
    """

    ID: str = None
    SCHEMA: StructType = None

    def __init__(self, spark: SparkSession) -> None:
        self.df: DataFrame = None
        self.spark: SparkSession = spark
        self.interface: IOInterface = None

    def read(self, *args, **kwargs):
        """
        Method that performs the read operation of the data object dataframe through an IOInterface.
        """
        self.df = self.interface.read_from_interface(*args, **kwargs)
        return self

    def write(self, *args, **kwargs):
        """
        Method that performs the write operation of the data object dataframe through an IOInterface.
        """
        self.interface.write_from_interface(self.df, *args, **kwargs)

    def cast_to_schema(self):
        columns = {field.name: F.col(field.name).cast(field.dataType) for field in self.SCHEMA.fields}
        self.df = self.df.withColumns(columns)

read(*args, **kwargs)

Method that performs the read operation of the data object dataframe through an IOInterface.

Source code in multimno/core/data_objects/data_object.py
32
33
34
35
36
37
def read(self, *args, **kwargs):
    """
    Method that performs the read operation of the data object dataframe through an IOInterface.
    """
    self.df = self.interface.read_from_interface(*args, **kwargs)
    return self

write(*args, **kwargs)

Method that performs the write operation of the data object dataframe through an IOInterface.

Source code in multimno/core/data_objects/data_object.py
39
40
41
42
43
def write(self, *args, **kwargs):
    """
    Method that performs the write operation of the data object dataframe through an IOInterface.
    """
    self.interface.write_from_interface(self.df, *args, **kwargs)

GeoParquetDataObject

Bases: PathDataObject

Class that models a DataObject that will use a ParquetInterface for IO operations. It inherits the PathDataObject abstract class.

Source code in multimno/core/data_objects/data_object.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
class GeoParquetDataObject(PathDataObject):
    """
    Class that models a DataObject that will use a ParquetInterface for IO operations.
    It inherits the PathDataObject abstract class.
    """

    def __init__(
        self,
        spark: SparkSession,
        default_path: str,
        default_partition_columns: List[str] = None,
        default_mode: str = SPARK_WRITING_MODES.APPEND,
        default_crs: int = INSPIRE_GRID_EPSG,
    ) -> None:
        super().__init__(spark, default_path, default_partition_columns, default_mode)
        self.interface: PathInterface = GeoParquetInterface()
        self.default_crs = default_crs

    def read(self):
        self.df = self.interface.read_from_interface(self.spark, self.default_path, self.SCHEMA)
        self.df = self.df.withColumn(ColNames.geometry, STF.ST_SetSRID((ColNames.geometry), F.lit(self.default_crs)))

        return self

ParquetDataObject

Bases: PathDataObject

Class that models a DataObject that will use a ParquetInterface for IO operations. It inherits the PathDataObject abstract class.

Source code in multimno/core/data_objects/data_object.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class ParquetDataObject(PathDataObject):
    """
    Class that models a DataObject that will use a ParquetInterface for IO operations.
    It inherits the PathDataObject abstract class.
    """

    def __init__(
        self,
        spark: SparkSession,
        default_path: str,
        default_partition_columns: List[str] = None,
        default_mode: str = SPARK_WRITING_MODES.APPEND,
    ) -> None:
        super().__init__(spark, default_path, default_partition_columns, default_mode)
        self.interface: PathInterface = ParquetInterface()

PathDataObject

Bases: DataObject

Abstract Class that models DataObjects that will use a PathInterface for IO operations. It inherits the DataObject abstract class.

Source code in multimno/core/data_objects/data_object.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
class PathDataObject(DataObject, metaclass=ABCMeta):
    """Abstract Class that models DataObjects that will use a PathInterface for IO operations.
    It inherits the DataObject abstract class.
    """

    ID = ...
    SCHEMA = ...
    PARTITION_COLUMNS = ...

    def __init__(
        self,
        spark: SparkSession,
        default_path: str,
        default_partition_columns: List[str] = None,
        default_mode: str = SPARK_WRITING_MODES.APPEND,
    ) -> None:
        super().__init__(spark)
        self.interface: PathInterface = None
        self.default_path: str = default_path
        if default_partition_columns is None:
            default_partition_columns = self.PARTITION_COLUMNS
        self.default_partition_columns: List[str] = default_partition_columns
        self.default_mode: str = default_mode

    def read(self, *args, path: str = None, **kwargs):
        if path is None:
            path = self.default_path
        self.df = self.interface.read_from_interface(self.spark, path, self.SCHEMA)

        return self

    def write(self, *args, path: str = None, partition_columns: list[str] = None, mode: str = None, **kwargs):
        if path is None:
            path = self.default_path
        if partition_columns is None:
            partition_columns = self.default_partition_columns
        if mode is None:
            mode = self.default_mode

        self.interface.write_from_interface(self.df, path=path, partition_columns=partition_columns, mode=mode)

    def get_size(self) -> int:
        """
        Returns the size of the data object in bytes.
        """
        files = self.df.inputFiles()

        if len(files) == 0:
            return 0

        conf = self.spark._jsc.hadoopConfiguration()
        # need to get proper URI prefix for the file system
        uri = self.spark._jvm.java.net.URI.create(files[0])
        fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
        total_size = 0

        for file in files:
            total_size += fs.getFileStatus(self.spark._jvm.org.apache.hadoop.fs.Path(file)).getLen()

        return total_size

    def get_num_files(self) -> int:
        """
        Returns the number of files of the data object.
        """
        return len(self.df.inputFiles())

    def get_top_rows(self, n: int, truncate: int = 20) -> str:
        """
        Returns string with top n rows. Same as df.show.
        """
        return self.df._jdf.showString(n, truncate, False)

get_num_files()

Returns the number of files of the data object.

Source code in multimno/core/data_objects/data_object.py
111
112
113
114
115
def get_num_files(self) -> int:
    """
    Returns the number of files of the data object.
    """
    return len(self.df.inputFiles())

get_size()

Returns the size of the data object in bytes.

Source code in multimno/core/data_objects/data_object.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def get_size(self) -> int:
    """
    Returns the size of the data object in bytes.
    """
    files = self.df.inputFiles()

    if len(files) == 0:
        return 0

    conf = self.spark._jsc.hadoopConfiguration()
    # need to get proper URI prefix for the file system
    uri = self.spark._jvm.java.net.URI.create(files[0])
    fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
    total_size = 0

    for file in files:
        total_size += fs.getFileStatus(self.spark._jvm.org.apache.hadoop.fs.Path(file)).getLen()

    return total_size

get_top_rows(n, truncate=20)

Returns string with top n rows. Same as df.show.

Source code in multimno/core/data_objects/data_object.py
117
118
119
120
121
def get_top_rows(self, n: int, truncate: int = 20) -> str:
    """
    Returns string with top n rows. Same as df.show.
    """
    return self.df._jdf.showString(n, truncate, False)