This post describes how you can use PySpark aes_encrypt() function to encrypt sensitive columns when ingesting data. It is part of a series that shows how column-level encryption can be deployed at scale using AWS Glue, AWS KMS and Amazon Athena or Amazon Redshift.

Introduction

In an era where data breaches are increasingly common, securing sensitive data is not just a best practice but a necessity. As Werner Vogels, Amazon’s CTO, wisely put it: “Dance Like Nobody’s Watching. Encrypt Like Everyone Is.” In this series, we explore how to deploy column-level encryption at scale using an integration of PySpark, AWS Glue, AWS KMS, and Amazon Redshift or Amazon Athena.

Why Column-Level Encryption?

Column-level encryption allows for fine-grained control over data access. It’s particularly useful in scenarios where different groups require access to specific data subsets. This post serves as an introductory guide, focusing on the implementation of PySpark’s aes_encrypt() function for encrypting sensitive data during ingestion.

Setting the Stage

Before diving into the encryption process, it’s important to understand the relevant technologies:

  • PySpark: A Python API for Apache Spark, used for large-scale data processing.
  • AWS Glue: A serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development.
  • AWS KMS: AWS Key Management Service, a managed service that makes it easy to create and manage cryptographic keys.
  • Amazon Athena: An interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL.
  • Amazon Redshift: A fast, fully managed, petabyte-scale data warehouse service that makes it simple and cost-effective to efficiently analyze all your data.

Getting Started

Prepare Your Environment

Read Sample Data

  • Generate / Upload a CSV file containing a data sample to work with to the notebook. I provide a sample file in the provided GitHub repository: CSV file.
  • Read the sample data using the code below.
import base64 as b64
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession.builder.appName("MyApp").getOrCreate()

df = spark.read.option("header", True).csv("sample-pci-data.csv")
df.show()

You should see something similar to the below.

+-------------------+-----------+-------------------+
|First and Last Name|        SSN| Credit Card Number|
+-------------------+-----------+-------------------+
|      Robert Aragon|489-36-8350|4929-3813-3266-4295|
|      Ashley Borden|514-14-8905|5370-4638-8881-3020|
...
only showing top 2 rows

Understanding aes_encrypt()

The aes_encrypt() function in PySpark 3.3 offers AES encryption with various configurations. Read more about it here.

Below is the syntax of the function. It uses AES encryption with 16, 24, or 32-bit keys in ECB, GCM, or CBC modes with appropriate padding. IVs are optional for CBC and GCM modes and are auto-generated if absent. For GCM, optional AAD must match during encryption and decryption. GCM is the default mode.

def aes_encrypt(
    input: "ColumnOrName",
    key: "ColumnOrName",
    mode: Optional["ColumnOrName"] = None,
    padding: Optional["ColumnOrName"] = None,
    iv: Optional["ColumnOrName"] = None,
    aad: Optional["ColumnOrName"] = None,
) -> Column:

The following exanple shows the encryption and decryption of the string Spark SQL using the key abcdefghijklmnop which is a 16 characters long = 16 bytes = 128 bits key.

df = spark.createDataFrame([(
    "Spark SQL", "abcdefghijklmnop",)],
    ["input", "key"]
)

df.select(aes_decrypt(
    unbase64(base64(aes_encrypt(df.input, df.key))), df.key
).cast("STRING").alias('r')).collect()

[Row(r='Spark SQL')]

Generating an Encryption Key

  • Use AWS KMS’s GenerateDataKey API for a robust key generation. This requires an existing KMS key.
    • To generate the key using the AWS CLI generate-data-key command, use the example shown below. Make sure to replace AWS_PROFILE and KMS_KEY_ID with the correct values.
    aws kms generate-data-key --profile AWS_PROFILE --key-id KMS_KEY_ID --key-spec AES_256
    
    The response shown below contains the key in Plaintext base64-encoded format and in encrypted (CiphertextBlob) format.
    {
        "CiphertextBlob": "AQIDAHjhvhi8z21Psjp5qjRibLXgHGkMMP/BmNWdIKTuZnLdNwHHYQjpmQo0oyk5rk07mjZTAAAAfjB8BgkqhkiG9w0BBwagbzBtAgEAMGgGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQM6uOfZ7iyJ2UztfidAgEQgDuFQ5fajsvnhfaCmN/q8kk1JinY7gHqT/Bz9W3RxqkEv5ggPZKELcQtqNbRYdEzSwKPDTs+Grp0RtRqWw==",
        "Plaintext": "tEZPizBEj5EG5IDY1SvAECa5yZa5fVP1SrJGsGimx9I=",
        "KeyId": "KMS_KEY_ID"
    }
    
    • If you generate the data key using boto3 kms client, you will get a byte string, similar to b"\xc27~\x15\xc7\x9a\x8a|\xb48\\\xd7\x894g-v\xac\xb5\n%\x17\x96g\xab\x88\x8a;|bU/". This value can be used as-is with the aes_encrypt() function.
  • Alternatively, derive a key from a string using PBKDF2. (Link to code sample)

Encrypting Sensitive Columns

We’ll encrypt the “SSN” and “Credit Card Number” columns as shown below.

encryption_key = "tEZPizBEj5EG5IDY1SvAECa5yZa5fVP1SrJGsGimx9I="
# Decode the key from Base64 
decoded_key = b64.b64decode(encryption_key)
# Encrypt two columns at the "same time" using the same key
df_encrypted = df.withColumn('SSN_Encrypted', base64(expr(f"aes_encrypt(SSN, unhex('{decoded_key.hex()}'), 'GCM')")))\ 
                 .withColumn('CreditCardNumber_Encrypted', base64(expr(f"aes_encrypt(Credit_Card_Number, unhex('{decoded_key.hex()}'), 'GCM')")))
df_encrypted.show(truncate=True)

# +----------------+-----------+-------------------+--------------------+--------------------------+
# | First_Last_Name|        SSN| Credit_Card_Number|       SSN_Encrypted|CreditCardNumber_Encrypted|
# +----------------+-----------+-------------------+--------------------+--------------------------+
# |   Robert Aragon|489-36-8350|4929-3813-3266-4295|HI39DkVYN9WLTWAH3...|      XqvsWgRty1CBkJ7c9...|
# |   Ashley Borden|514-14-8905|5370-4638-8881-3020|5ME9bErsff7Zhzw5z...|      mhizKvf5053KqStxp...|
#  ...
# only showing top 2 rows

# Output path to store the new CSV
output_path = "./encrypted/sample-pci-data-encrypted.csv"
df_encrypted.write.csv(path=output_path, mode="overwrite", header=True, sep=",")

Decryption Process

Here’s how you can decrypt the data using the same key.

df = spark.read.option("header", True).csv("encrypted/sample-pci-data-encrypted.csv")
df.show()
df_decrypted = df.withColumn('SSN_Decrypted', expr(f"aes_decrypt(unbase64(SSN_Encrypted), unhex('{decoded_key.hex()}'), 'GCM')").cast("STRING"))
                 .withColumn('CC_Decrypted', expr(f"aes_decrypt(unbase64(CreditCardNumber_Encrypted), unhex('{decoded_key.hex()}'), 'GCM')").cast("STRING"))

# +----------------+-----------+-------------------+--------------------+--------------------------+
# | First_Last_Name|        SSN| Credit_Card_Number|       SSN_Encrypted|CreditCardNumber_Encrypted|
# +----------------+-----------+-------------------+--------------------+--------------------------+
# |   Robert Aragon|489-36-8350|4929-3813-3266-4295|Ft4y7DmP8xN8QtdHn...|      4f7Usr2dvVWbG26Bd...|
# |   Ashley Borden|514-14-8905|5370-4638-8881-3020|nsdeq//on5T/L64Ow...|      8LN7pIP1pa7Nmoma1...|
# only showing top 2 rows

# +----------------+-----------+-------------------+--------------------+--------------------------+-------------+-------------------+
# | First_Last_Name|        SSN| Credit_Card_Number|       SSN_Encrypted|CreditCardNumber_Encrypted|SSN_Decrypted|       CC_Decrypted|
# +----------------+-----------+-------------------+--------------------+--------------------------+-------------+-------------------+
# |   Robert Aragon|489-36-8350|4929-3813-3266-4295|Ft4y7DmP8xN8QtdHn...|      4f7Usr2dvVWbG26Bd...|  489-36-8350|4929-3813-3266-4295|
# |   Ashley Borden|514-14-8905|5370-4638-8881-3020|nsdeq//on5T/L64Ow...|      8LN7pIP1pa7Nmoma1...|  514-14-8905|5370-4638-8881-3020|
# only showing top 2 rows

Conclusion

In this post, I walked through the steps of encrypting and decrypting sensitive data columns using PySpark in a Jupyter Notebook. This setup serves as a foundational step towards a more complex and scalable data ingestion and consumption model, which I’ll explore in upcoming posts, including the integration of AWS Glue and AWS KMS.

Stay tuned for further insights on scaling this approach and managing encryption keys with AWS KMS!