Processing compressed files with Apache Spark

Currently, I am working on a project that requires me to preprocess a lot of files. Once the file is compressed using lzma, then one or more compressed files are bundled together using zip. The file content is JSON.

Info: If there is the option to have the files in a different format, I would recommend using a more common format like Parquet. This would make it easier to work with the files and avoid the need for manually decompressing them. However, if you are stuck with the current format, this post might help you. Even just using gzip would probably better, since Spark has built-in support for it.

The goal is to decompress the files, parse the json of each file, do some transformations and save the result in a new file, without persisting the intermediate files.

The files are stored in a S3 bucket, so I will use the S3 connector for Apache Spark to read and write the files.

Environment

Using the following conda environment:

name: spark-transform
channels:
  - defaults
  - https://repo.anaconda.com/pkgs/main
  - https://repo.anaconda.com/pkgs/r
dependencies:
  - _libgcc_mutex=0.1=main
  - _openmp_mutex=5.1=1_gnu
  - bzip2=1.0.8=h5eee18b_6
  - ca-certificates=2025.2.25=h06a4308_0
  - cairo=1.16.0=hb05425b_5
  - dbus=1.13.18=hb2f20db_0
  - expat=2.7.1=h6a678d5_0
  - fontconfig=2.14.1=h55d465d_3
  - freetype=2.13.3=h4a9f257_0
  - fribidi=1.0.10=h7b6447c_0
  - glib=2.78.4=h6a678d5_0
  - glib-tools=2.78.4=h6a678d5_0
  - graphite2=1.3.14=h295c915_1
  - harfbuzz=10.2.0=hf296adc_0
  - icu=73.1=h6a678d5_0
  - ld_impl_linux-64=2.40=h12ee557_0
  - libffi=3.4.4=h6a678d5_1
  - libgcc-ng=11.2.0=h1234567_1
  - libglib=2.78.4=hdc74915_0
  - libgomp=11.2.0=h1234567_1
  - libiconv=1.16=h5eee18b_3
  - libmpdec=4.0.0=h5eee18b_0
  - libpng=1.6.39=h5eee18b_0
  - libstdcxx-ng=11.2.0=h1234567_1
  - libuuid=1.41.5=h5eee18b_0
  - libxcb=1.17.0=h9b100fa_0
  - libxml2=2.13.8=hfdd30dd_0
  - ncurses=6.4=h6a678d5_0
  - openjdk=21.0.6=h38aa4c6_0
  - openssl=3.0.16=h5eee18b_0
  - pango=1.50.7=h0fee60c_1
  - pcre2=10.42=hebb0a14_1
  - pip=25.1=pyhc872135_2
  - pixman=0.40.0=h7f8727e_1
  - pthread-stubs=0.3=h0ce48e5_1
  - python=3.13.2=hf623796_100_cp313
  - python_abi=3.13=0_cp313
  - readline=8.2=h5eee18b_0
  - setuptools=78.1.1=py313h06a4308_0
  - sqlite=3.45.3=h5eee18b_0
  - tk=8.6.14=h39e8969_0
  - tzdata=2025b=h04d1e81_0
  - wheel=0.45.1=py313h06a4308_0
  - xorg-libxau=1.0.12=h9b100fa_0
  - xorg-libxdmcp=1.1.5=h9b100fa_0
  - xz=5.6.4=h5eee18b_1
  - zlib=1.2.13=h5eee18b_1
  - pip:
      - py4j==0.10.9.7
      - pyspark==3.5.5

You can load the environment using the following command:

conda env create -f environment.yml

A small poc

Creating a test file

First, we need to create a test file. The following code will create a zip file with two files inside. Each file contains a JSON object with a key and a value. The files are compressed using lzma and then zipped together.

import lzma
import zipfile
from io import BytesIO

filename = "test.zip"

contents = {
    "test1": "{\"key\": \"value1\"}",
    "test2": "{\"key\": \"value2\"}",
}

# Create a BytesIO stream to hold the zip file in memory
filestream = BytesIO()

zipf = zipfile.ZipFile(filestream, mode='w', compression=zipfile.ZIP_DEFLATED)
for content in contents:
    data = contents[content].encode()
    zipf.writestr(f"{content}.json", lzma.compress(data))

zipf.close()

filestream.seek(0)
with open(filename, 'wb') as f:
    f.write(filestream.read())

Loading the file

Using User-Defined Functions in Apache Spark, we can load the file and decompress it. The following code will read the zip file, decompress the files inside and parse the JSON content.

import lzma

from pyspark.sql import SparkSession


def unzip(buffer):
    import zipfile
    from io import BytesIO

    contents = []
    with zipfile.ZipFile(BytesIO(buffer), 'r') as z:
        for file in z.namelist():
            with z.open(file) as f:
                contents.append(
                    {
                        "file": file,
                        "content": lzma.decompress(f.read()).decode()
                    }
                )

    return contents


spark = SparkSession.builder.appName("Transform zip content").getOrCreate()
zipData = spark.read.format("binaryFile").load("./test.zip").cache()

# decompress the zip file
unzipUdf = spark.udf.register("unzip", unzip, "array<struct<file:string, content:string>>")
unzipData = zipData.selectExpr("path", "unzip(content) as content")

# split the content into separate columns
filesWithContent = unzipData.selectExpr("path", "explode(content) as content")
filesWithContent = filesWithContent.selectExpr("path", "content.file as file", "content.content as content")

# parse as json
jsonData = filesWithContent.selectExpr("path", "file", "from_json(content, 'struct<key:string>') as json")
jsonData = jsonData.selectExpr("path", "file", "json.key as key")

# print the results
jsonData.show(truncate=False)

spark.stop()

The code will output the following:

+--------------------+------------+-----+
|path                |file        |key  |
+--------------------+------------+-----+
|file:/home/user/test.zip|test1.json |value1|
|file:/home/user/test.zip|test2.json |value2|
+--------------------+------------+-----+

In combination with S3

The following code will read the files from the S3 bucket, decompress them and parse the JSON content. The code below is not production code, but it should give you an idea of how to put everything together.

Don’t hardcode the credentials in the code. Use a different way to load them, like using environment variables or using a secrets manager.

import lzma

from pyspark.sql import SparkSession


def unzip(buffer):
    import zipfile
    from io import BytesIO

    contents = []
    with zipfile.ZipFile(BytesIO(buffer), 'r') as z:
        for file in z.namelist():
            # ... some logic removed
            with z.open(file) as f:
                contents.append(
                    {
                        "file": file,
                        "content": lzma.decompress(f.read()).decode()
                    }
                )

    return contents


spark = (SparkSession.builder.appName("Transform zip content")
         .config('spark.hadoop.fs.s3a.aws.credentials.provider',
                 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
         .config("spark.hadoop.fs.s3a.access.key", "ACCESS_KEY")
         .config("spark.hadoop.fs.s3a.secret.key", "SECRET_KEY")
         .config("spark.hadoop.fs.s3a.endpoint", "ENDPOINT")
         .config("spark.hadoop.fs.s3a.path.style.access", "true")
         .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
         .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4")
         .getOrCreate())
zipData = spark.read.format("binaryFile").option("pathGlobFilter", "*.datapoint")
   .option("recursiveFileLookup", "true")
   .load("s3a://bucket/the/path/").cache()

# decompress the zip file
unzipUdf = spark.udf.register("unzip", unzip, "array<struct<file:string, content:string>>")
unzipData = zipData.selectExpr("path", "unzip(content) as content")

# further processing...

Additional notes

You need to take in consideration the size of the files you are working with. If the files are too big, it might just be better to extract / preprocess the files using a different tool and then load them into Spark.