Skip to content

spark_session

Module that manages the spark session.

SPARK_WRITING_MODES

Enum class to define writing modes for spark

Source code in multimno/core/spark_session.py
20
21
22
23
24
25
26
class SPARK_WRITING_MODES:
    """Enum class to define writing modes for spark"""

    OVERWRITE = "overwrite"
    APPEND = "append"
    IGNORE = "ignore"
    ERROR = "error"

check_if_data_path_exists(spark, data_path)

Checks whether data path exists, returns True if it does, False if not

Parameters:

Name Type Description Default
spark SparkSession

active SparkSession

required
data_path str

path to check

required

Returns:

Name Type Description
bool bool

Whether the passed path exists

Source code in multimno/core/spark_session.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
def check_if_data_path_exists(spark: SparkSession, data_path: str) -> bool:
    """
    Checks whether data path exists, returns True if it does, False if not

    Args:
        spark (SparkSession): active SparkSession
        data_path (str): path to check

    Returns:
        bool: Whether the passed path exists
    """
    conf = spark._jsc.hadoopConfiguration()
    uri = spark._jvm.java.net.URI.create(data_path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
    return fs.exists(spark._jvm.org.apache.hadoop.fs.Path(data_path))

check_or_create_data_path(spark, data_path)

Create the provided path on a file system. If path already exists, do nothing.

Parameters:

Name Type Description Default
spark SparkSession

active SparkSession

required
data_path str

path to check

required
Source code in multimno/core/spark_session.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def check_or_create_data_path(spark: SparkSession, data_path: str):
    """
    Create the provided path on a file system. If path already exists, do nothing.

    Args:
        spark (SparkSession): active SparkSession
        data_path (str): path to check
    """
    conf = spark._jsc.hadoopConfiguration()
    uri = spark._jvm.java.net.URI.create(data_path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
    path = spark._jvm.org.apache.hadoop.fs.Path(data_path)
    if not fs.exists(path):
        fs.mkdirs(path)

delete_file_or_folder(spark, data_path)

Deletes file or folder with given path

Parameters:

Name Type Description Default
spark SparkSession

Currently active spark session

required
data_path str

Path to remove

required
Source code in multimno/core/spark_session.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def delete_file_or_folder(spark: SparkSession, data_path: str):
    """
    Deletes file or folder with given path

    Args:
        spark (SparkSession): Currently active spark session
        data_path (str): Path to remove
    """
    conf = spark._jsc.hadoopConfiguration()
    uri = spark._jvm.java.net.URI.create(data_path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
    path = spark._jvm.org.apache.hadoop.fs.Path(data_path)
    fs.delete(path, True)

generate_spark_session(config)

Function that generates a Spark Sedona session.

Parameters:

Name Type Description Default
config ConfigParser

Object with the final configuration.

required

Returns:

Name Type Description
SparkSession SparkSession

Session of spark.

Source code in multimno/core/spark_session.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def generate_spark_session(config: ConfigParser) -> SparkSession:
    """Function that generates a Spark Sedona session.

    Args:
        config (ConfigParser): Object with the final configuration.

    Returns:
        SparkSession: Session of spark.
    """
    conf_dict = dict(config[SPARK_CONFIG_KEY])
    master = conf_dict.pop("spark.master")
    session_name = conf_dict.pop("session_name")

    builder = SedonaContext.builder().appName(f"{session_name}").master(master)

    # Configuration file spark configs
    for k, v in conf_dict.items():
        builder = builder.config(k, v)

    ##################
    # SEDONA
    ##################

    # Set sedona session
    spark = SedonaContext.create(builder.getOrCreate())
    sc = spark.sparkContext
    sc.setSystemProperty("sedona.global.charset", "utf8")

    # Set log
    sc.setLogLevel("ERROR")
    log4j = sc._jvm.org.apache.log4j
    log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)

    return spark

list_all_files_helper(path, fs, conf)

This function is used by list_all_files_recursively. This should not be called elsewhere Recursively traverses the file tree from given spot saving all files to a list and returns it.

Parameters:

Name Type Description Default
path str

py4j.java_gateway.JavaObject: Object from parent function

required
fs JavaClass

Object from parent function

required
conf JavaObject

Object from parent function

required

Returns:

Name Type Description
list List[str]

List of all files this folder and subdirectories of this folder.

Source code in multimno/core/spark_session.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def list_all_files_helper(
    path: py4j.java_gateway.JavaObject, fs: py4j.java_gateway.JavaClass, conf: py4j.java_gateway.JavaObject
) -> List[str]:
    """
    This function is used by list_all_files_recursively. This should not be called elsewhere
    Recursively traverses the file tree from given spot saving all files to a list and returns it.

    Args:
        path (str): py4j.java_gateway.JavaObject: Object from parent function
        fs (py4j.java_gateway.JavaClass): Object from parent function
        conf (py4j.java_gateway.JavaObject): Object from parent function

    Returns:
        list: List of all files this folder and subdirectories of this folder.
    """
    files_list = []

    for f in fs.listStatus(path):
        if f.isDirectory():
            files_list.extend(list_all_files_helper(f.getPath(), fs, conf))
        else:
            files_list.append(str(f.getPath()))

    return files_list

list_all_files_recursively(spark, data_path)

If path is a file, returns a singleton list with this path. If path is a folder, return a list of all files in this folder and any of its subfolders

Parameters:

Name Type Description Default
spark SparkSession

Currently active spark session

required
data_path str

Path to list the files of

required

Returns:

Type Description
List[str]

List[str]: A list of all files in that folder and its subfolders

Source code in multimno/core/spark_session.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def list_all_files_recursively(spark: SparkSession, data_path: str) -> List[str]:
    """
    If path is a file, returns a singleton list with this path.
    If path is a folder, return a list of all files in this folder and any of its subfolders

    Args:
        spark (SparkSession): Currently active spark session
        data_path (str): Path to list the files of

    Returns:
        List[str]: A list of all files in that folder and its subfolders
    """
    conf = spark._jsc.hadoopConfiguration()
    uri = spark._jvm.java.net.URI.create(data_path)
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(uri, conf)
    path = spark._jvm.org.apache.hadoop.fs.Path(data_path)
    return list_all_files_helper(path, fs, conf)

list_parquet_partition_col_values(spark, data_path)

Lists all partition column values given a partition parquet folder

Parameters:

Name Type Description Default
spark SparkSession

Currently active spark session

required
data_path str

Path of parquet

required

Returns:

Type Description
List[str]

str, List[str]: Name of partition column, List of partition col values

Source code in multimno/core/spark_session.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def list_parquet_partition_col_values(spark: SparkSession, data_path: str) -> List[str]:
    """
    Lists all partition column values given a partition parquet folder

    Args:
        spark (SparkSession): Currently active spark session
        data_path (str): Path of parquet

    Returns:
        str, List[str]: Name of partition column, List of partition col values
    """

    hadoop = spark._jvm.org.apache.hadoop
    fs = hadoop.fs.FileSystem
    conf = hadoop.conf.Configuration()
    path = hadoop.fs.Path(data_path)

    partitions = []
    for f in fs.get(conf).listStatus(path):
        if f.isDirectory():
            partitions.append(str(f.getPath().getName()))

    if len(partitions) == 0:
        return None, None

    partition_col = partitions[0].split("=")[0]

    partitions = [p.split("=")[1] for p in partitions]
    return partition_col, sorted(partitions)