Skip to content

io_interface

Module that implements classes for reading data from different data sources into a Spark DataFrames.

CsvInterface

Bases: PathInterface

Class that implements the PathInterface abstract class for reading/writing data from a csv data source.

Source code in multimno/core/io_interface.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
class CsvInterface(PathInterface):
    """Class that implements the PathInterface abstract class for reading/writing data from a csv data source."""

    FILE_FORMAT = "csv"

    def read_from_interface(
        self,
        spark: SparkSession,
        path: str,
        schema: StructType,
        header: bool = True,
        sep: str = ",",
    ) -> DataFrame:
        """Method that reads data from a csv type data source as a Spark DataFrame.

        Args:
            spark (SparkSession): Spark session.
            path (str): Path to the data.
            schema (StructType, optional): Schema of the data. Defaults to None.

        Returns:
            df: Spark dataframe.
        """
        return spark.read.csv(path, schema=schema, header=header, sep=sep)

    def write_from_interface(
        self,
        df: DataFrame,
        path: str,
        partition_columns: List[str] = None,
        header: bool = True,
        sep: str = ",",
    ):
        """Method that writes data from a Spark DataFrame to a csv data source.

        Args:
            df (DataFrame): Spark DataFrame.
            path (str): Path to the data.
            partition_columns (List[str], optional): columns used for a partition write.
        Raises:
            NotImplementedError: csv files should not be written in this architecture.
        """
        if partition_columns is None:
            partition_columns = []
        df.write.option("header", header).option("sep", sep).mode("overwrite").format("csv").save(path)

read_from_interface(spark, path, schema, header=True, sep=',')

Method that reads data from a csv type data source as a Spark DataFrame.

Parameters:

Name Type Description Default
spark SparkSession

Spark session.

required
path str

Path to the data.

required
schema StructType

Schema of the data. Defaults to None.

required

Returns:

Name Type Description
df DataFrame

Spark dataframe.

Source code in multimno/core/io_interface.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
def read_from_interface(
    self,
    spark: SparkSession,
    path: str,
    schema: StructType,
    header: bool = True,
    sep: str = ",",
) -> DataFrame:
    """Method that reads data from a csv type data source as a Spark DataFrame.

    Args:
        spark (SparkSession): Spark session.
        path (str): Path to the data.
        schema (StructType, optional): Schema of the data. Defaults to None.

    Returns:
        df: Spark dataframe.
    """
    return spark.read.csv(path, schema=schema, header=header, sep=sep)

write_from_interface(df, path, partition_columns=None, header=True, sep=',')

Method that writes data from a Spark DataFrame to a csv data source.

Parameters:

Name Type Description Default
df DataFrame

Spark DataFrame.

required
path str

Path to the data.

required
partition_columns List[str]

columns used for a partition write.

None

Raises: NotImplementedError: csv files should not be written in this architecture.

Source code in multimno/core/io_interface.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
def write_from_interface(
    self,
    df: DataFrame,
    path: str,
    partition_columns: List[str] = None,
    header: bool = True,
    sep: str = ",",
):
    """Method that writes data from a Spark DataFrame to a csv data source.

    Args:
        df (DataFrame): Spark DataFrame.
        path (str): Path to the data.
        partition_columns (List[str], optional): columns used for a partition write.
    Raises:
        NotImplementedError: csv files should not be written in this architecture.
    """
    if partition_columns is None:
        partition_columns = []
    df.write.option("header", header).option("sep", sep).mode("overwrite").format("csv").save(path)

GeoParquetInterface

Bases: PathInterface

Class that implements the PathInterface abstract class for reading/writing data from a geoparquet data source.

Source code in multimno/core/io_interface.py
177
178
179
180
class GeoParquetInterface(PathInterface):
    """Class that implements the PathInterface abstract class for reading/writing data from a geoparquet data source."""

    FILE_FORMAT = "geoparquet"

HttpGeoJsonInterface

Bases: IOInterface

Class that implements the IO interface abstract class for reading GeoJSON data from an HTTP source.

Source code in multimno/core/io_interface.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
class HttpGeoJsonInterface(IOInterface):
    """Class that implements the IO interface abstract class for reading GeoJSON data from an HTTP source."""

    def read_from_interface(self, spark: SparkSession, url: str, timeout: int = 60, max_retries: int = 5) -> DataFrame:
        """Method that reads GeoJSON data from an HTTP source and converts it to a Spark DataFrame.

        Args:
            url (str): URL of the GeoJSON data.
            timeout (int): Timeout for the GET request in seconds. Default is 60.
            max_retries (int): Maximum number of retries for the GET request. Default is 5.

        Returns:
            df: Spark DataFrame.
        """
        session = requests.Session()
        retry = Retry(total=max_retries, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
        adapter = HTTPAdapter(max_retries=retry)
        session.mount("http://", adapter)
        session.mount("https://", adapter)

        try:
            response = session.get(url, timeout=timeout)
        except requests.exceptions.RequestException as e:
            print(e)
            raise Exception("Maximum number of retries exceeded.")

        if response.status_code != 200:
            raise Exception("GET request not successful.")

        # Read the GeoJSON data into a GeoDataFrame
        gdf = gpd.read_file(StringIO(response.text))

        # Convert the GeoDataFrame to a Spark DataFrame
        df = spark.createDataFrame(gdf)

        return df

    def write_from_interface(self, df: DataFrame, url: str, timeout: int = 60, max_retries: int = 5):
        """Method that writes a DataFrame to an HTTP source as GeoJSON data.

        Args:
            df (DataFrame): DataFrame to write.
            url (str): URL of the HTTP source.
            timeout (int): Timeout for the POST request in seconds. Default is 60.
            max_retries (int): Maximum number of retries for the POST request. Default is 5.
        """
        raise NotImplementedError("This method is not implemented.")

read_from_interface(spark, url, timeout=60, max_retries=5)

Method that reads GeoJSON data from an HTTP source and converts it to a Spark DataFrame.

Parameters:

Name Type Description Default
url str

URL of the GeoJSON data.

required
timeout int

Timeout for the GET request in seconds. Default is 60.

60
max_retries int

Maximum number of retries for the GET request. Default is 5.

5

Returns:

Name Type Description
df DataFrame

Spark DataFrame.

Source code in multimno/core/io_interface.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def read_from_interface(self, spark: SparkSession, url: str, timeout: int = 60, max_retries: int = 5) -> DataFrame:
    """Method that reads GeoJSON data from an HTTP source and converts it to a Spark DataFrame.

    Args:
        url (str): URL of the GeoJSON data.
        timeout (int): Timeout for the GET request in seconds. Default is 60.
        max_retries (int): Maximum number of retries for the GET request. Default is 5.

    Returns:
        df: Spark DataFrame.
    """
    session = requests.Session()
    retry = Retry(total=max_retries, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)

    try:
        response = session.get(url, timeout=timeout)
    except requests.exceptions.RequestException as e:
        print(e)
        raise Exception("Maximum number of retries exceeded.")

    if response.status_code != 200:
        raise Exception("GET request not successful.")

    # Read the GeoJSON data into a GeoDataFrame
    gdf = gpd.read_file(StringIO(response.text))

    # Convert the GeoDataFrame to a Spark DataFrame
    df = spark.createDataFrame(gdf)

    return df

write_from_interface(df, url, timeout=60, max_retries=5)

Method that writes a DataFrame to an HTTP source as GeoJSON data.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to write.

required
url str

URL of the HTTP source.

required
timeout int

Timeout for the POST request in seconds. Default is 60.

60
max_retries int

Maximum number of retries for the POST request. Default is 5.

5
Source code in multimno/core/io_interface.py
220
221
222
223
224
225
226
227
228
229
def write_from_interface(self, df: DataFrame, url: str, timeout: int = 60, max_retries: int = 5):
    """Method that writes a DataFrame to an HTTP source as GeoJSON data.

    Args:
        df (DataFrame): DataFrame to write.
        url (str): URL of the HTTP source.
        timeout (int): Timeout for the POST request in seconds. Default is 60.
        max_retries (int): Maximum number of retries for the POST request. Default is 5.
    """
    raise NotImplementedError("This method is not implemented.")

IOInterface

Abstract interface that provides functionality for reading and writing data

Source code in multimno/core/io_interface.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class IOInterface(metaclass=ABCMeta):
    """Abstract interface that provides functionality for reading and writing data"""

    @classmethod
    def __subclasshook__(cls, subclass: type) -> bool:
        if cls is IOInterface:
            attrs: List[str] = []
            callables: List[str] = ["read_from_interface", "write_from_interface"]
            ret: bool = True
            for attr in attrs:
                ret = ret and (hasattr(subclass, attr) and isinstance(getattr(subclass, attr), property))
            for call in callables:
                ret = ret and (hasattr(subclass, call) and callable(getattr(subclass, call)))
            return ret
        else:
            return NotImplemented

    @abstractmethod
    def read_from_interface(self, *args, **kwargs) -> DataFrame:
        pass

    @abstractmethod
    def write_from_interface(self, df: DataFrame, *args, **kwargs):
        pass

JsonInterface

Bases: PathInterface

Class that implements the PathInterface abstract class for reading/writing data from a json data source.

Source code in multimno/core/io_interface.py
94
95
96
97
class JsonInterface(PathInterface):
    """Class that implements the PathInterface abstract class for reading/writing data from a json data source."""

    FILE_FORMAT = "json"

ParquetInterface

Bases: PathInterface

Class that implements the PathInterface abstract class for reading/writing data from a parquet data source.

Source code in multimno/core/io_interface.py
88
89
90
91
class ParquetInterface(PathInterface):
    """Class that implements the PathInterface abstract class for reading/writing data from a parquet data source."""

    FILE_FORMAT = "parquet"

PathInterface

Bases: IOInterface

Abstract interface for reading/writing data from a file type data source.

Source code in multimno/core/io_interface.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
class PathInterface(IOInterface, metaclass=ABCMeta):
    """Abstract interface for reading/writing data from a file type data source."""

    FILE_FORMAT = ""

    def read_from_interface(self, spark: SparkSession, path: str, schema: StructType = None) -> DataFrame:
        """Method that reads data from a file type data source as a Spark DataFrame.

        Args:
            spark (SparkSession): Spark session.
            path (str): Path to the data.
            schema (StructType, optional): Schema of the data. Defaults to None.

        Returns:
            df: Spark dataframe.
        """
        if schema is None:
            return spark.read.format(self.FILE_FORMAT).load(path)
        else:
            return (
                spark.read.schema(schema).format(self.FILE_FORMAT).load(path)
            )  # Read schema  # File format  # Load path

    def write_from_interface(
        self, df: DataFrame, path: str, partition_columns: List[str] = None, mode: str = SPARK_WRITING_MODES.APPEND
    ):
        """Method that writes data from a Spark DataFrame to a file type data source.

        Args:
            df (DataFrame): Spark DataFrame.
            path (str): Path to the data.
            partition_columns (List[str], optional): columns used for a partition write.
        """
        # Args check
        if partition_columns is None:
            partition_columns = []

        df.write.format(
            self.FILE_FORMAT,  # File format
        ).partitionBy(partition_columns).mode(
            mode
        ).save(path)

read_from_interface(spark, path, schema=None)

Method that reads data from a file type data source as a Spark DataFrame.

Parameters:

Name Type Description Default
spark SparkSession

Spark session.

required
path str

Path to the data.

required
schema StructType

Schema of the data. Defaults to None.

None

Returns:

Name Type Description
df DataFrame

Spark dataframe.

Source code in multimno/core/io_interface.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def read_from_interface(self, spark: SparkSession, path: str, schema: StructType = None) -> DataFrame:
    """Method that reads data from a file type data source as a Spark DataFrame.

    Args:
        spark (SparkSession): Spark session.
        path (str): Path to the data.
        schema (StructType, optional): Schema of the data. Defaults to None.

    Returns:
        df: Spark dataframe.
    """
    if schema is None:
        return spark.read.format(self.FILE_FORMAT).load(path)
    else:
        return (
            spark.read.schema(schema).format(self.FILE_FORMAT).load(path)
        )  # Read schema  # File format  # Load path

write_from_interface(df, path, partition_columns=None, mode=SPARK_WRITING_MODES.APPEND)

Method that writes data from a Spark DataFrame to a file type data source.

Parameters:

Name Type Description Default
df DataFrame

Spark DataFrame.

required
path str

Path to the data.

required
partition_columns List[str]

columns used for a partition write.

None
Source code in multimno/core/io_interface.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def write_from_interface(
    self, df: DataFrame, path: str, partition_columns: List[str] = None, mode: str = SPARK_WRITING_MODES.APPEND
):
    """Method that writes data from a Spark DataFrame to a file type data source.

    Args:
        df (DataFrame): Spark DataFrame.
        path (str): Path to the data.
        partition_columns (List[str], optional): columns used for a partition write.
    """
    # Args check
    if partition_columns is None:
        partition_columns = []

    df.write.format(
        self.FILE_FORMAT,  # File format
    ).partitionBy(partition_columns).mode(
        mode
    ).save(path)

ShapefileInterface

Bases: PathInterface

Class that implements the PathInterface abstract class for reading/writing data from a ShapeFile data source.

Source code in multimno/core/io_interface.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
class ShapefileInterface(PathInterface):
    """Class that implements the PathInterface abstract class for reading/writing data from a ShapeFile data source."""

    def read_from_interface(self, spark: SparkSession, path: str, schema: StructType = None) -> DataFrame:
        """Method that reads data from a ShapeFile type data source as a Spark DataFrame.

        Args:
            spark (SparkSession): Spark session.
            path (str): Path to the data.
            schema (StructType, optional): Schema of the data. Defaults to None.

        Returns:
            df: Spark dataframe.
        """
        df = ShapefileReader.readToGeometryRDD(spark.sparkContext, path)
        return Adapter.toDf(df, spark)

    def write_from_interface(self, df: DataFrame, path: str, partition_columns: list = None):
        """Method that writes data from a Spark DataFrame to a ShapeFile data source.

        Args:
            df (DataFrame): Spark DataFrame.
            path (str): Path to the data.
            partition_columns (List[str], optional): columns used for a partition write.
        Raises:
            NotImplementedError: ShapeFile files should not be written in this architecture.
        """
        raise NotImplementedError("Not implemented as Shapefiles shouldn't be written")

read_from_interface(spark, path, schema=None)

Method that reads data from a ShapeFile type data source as a Spark DataFrame.

Parameters:

Name Type Description Default
spark SparkSession

Spark session.

required
path str

Path to the data.

required
schema StructType

Schema of the data. Defaults to None.

None

Returns:

Name Type Description
df DataFrame

Spark dataframe.

Source code in multimno/core/io_interface.py
103
104
105
106
107
108
109
110
111
112
113
114
115
def read_from_interface(self, spark: SparkSession, path: str, schema: StructType = None) -> DataFrame:
    """Method that reads data from a ShapeFile type data source as a Spark DataFrame.

    Args:
        spark (SparkSession): Spark session.
        path (str): Path to the data.
        schema (StructType, optional): Schema of the data. Defaults to None.

    Returns:
        df: Spark dataframe.
    """
    df = ShapefileReader.readToGeometryRDD(spark.sparkContext, path)
    return Adapter.toDf(df, spark)

write_from_interface(df, path, partition_columns=None)

Method that writes data from a Spark DataFrame to a ShapeFile data source.

Parameters:

Name Type Description Default
df DataFrame

Spark DataFrame.

required
path str

Path to the data.

required
partition_columns List[str]

columns used for a partition write.

None

Raises: NotImplementedError: ShapeFile files should not be written in this architecture.

Source code in multimno/core/io_interface.py
117
118
119
120
121
122
123
124
125
126
127
def write_from_interface(self, df: DataFrame, path: str, partition_columns: list = None):
    """Method that writes data from a Spark DataFrame to a ShapeFile data source.

    Args:
        df (DataFrame): Spark DataFrame.
        path (str): Path to the data.
        partition_columns (List[str], optional): columns used for a partition write.
    Raises:
        NotImplementedError: ShapeFile files should not be written in this architecture.
    """
    raise NotImplementedError("Not implemented as Shapefiles shouldn't be written")