""" The script provides functionality: 1. Compress directory using ZSTD 2. Upload compressed file to S3 storage 3. Scheduled to run every day You can set some envs vars: - SCHEDULE - can be monthly, weekly, daily (by default), hourly - TIME_ZONE - timezone for datetime using in filenames, default is Europe/Tallinn - COMPRESSION_LEVEL - level of ZSTD compression, default is 10 - PREFIX - prefix for backup filename, default is empty - FOLDER_TO_BACKUP - folder which script will compress, default is /backup Settings for S3 storage: - AWS_S3_REGION_NAME - default nl-ams - AWS_S3_ENDPOINT_URL - default https://s3.nl-ams.scw.cloud - AWS_ACCESS_KEY_ID - AWS_SECRET_ACCESS_KEY - AWS_BUCKET_NAME """ import os import subprocess import threading import time from datetime import datetime import boto3 import pytz import schedule from boto3.exceptions import S3UploadFailedError SCHEDULE = os.getenv("SCHEDULE", "DAILY") TIME_ZONE = pytz.timezone(os.getenv("TIME_ZONE", "Europe/Tallinn")) PREFIX = os.getenv("PREFIX", "") FOLDER_TO_BACKUP = os.getenv("FOLDER_TO_BACKUP", "/backup") AWS_S3_REGION_NAME = os.getenv("AWS_S3_REGION_NAME", "nl-ams") AWS_S3_ENDPOINT_URL = os.getenv("AWS_S3_ENDPOINT_URL", "https://s3.nl-ams.scw.cloud") AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID") AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY") AWS_BUCKET_NAME = os.getenv("AWS_BUCKET_NAME") COMPRESSION_LEVEL = os.getenv("COMPRESSION_LEVEL", 10) def compress_folder_to_zstd(file_path: str) -> bool: try: process = subprocess.Popen( f"tar -c -P -I 'zstd -{COMPRESSION_LEVEL}' -f {file_path} {FOLDER_TO_BACKUP}", stdout=subprocess.PIPE, shell=True, ) process.communicate() if process.returncode == 2: print(f"Command failed. Return code : {process.returncode}") return False return True except (subprocess.SubprocessError, OSError) as exception: print(exception) return False def upload_to_s3(compressed_file_path: str, filename: str) -> bool: time_string = datetime.now(tz=TIME_ZONE).strftime("%Y/%m/%d") destination_folder = f"{time_string}/{filename}" try: s3_client = boto3.client( service_name="s3", region_name=AWS_S3_REGION_NAME, endpoint_url=AWS_S3_ENDPOINT_URL, aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY, ) s3_client.upload_file( Filename=compressed_file_path, Bucket=AWS_BUCKET_NAME, Key=destination_folder, ) os.remove(compressed_file_path) return True except S3UploadFailedError as exception: print(exception) return False def run_backup_files() -> None: """ Run backup script """ scheduled_time = datetime.now(tz=TIME_ZONE) filename = f"{scheduled_time:%y%m%d-%H%M}.tar.zst" if PREFIX: filename = f"{PREFIX}-{filename}" archive_file_path = f"/tmp/{filename}" backup_success = compress_folder_to_zstd(file_path=archive_file_path) if not backup_success: print("Backup failed") return backup_time = datetime.now(tz=TIME_ZONE) upload_success = upload_to_s3(archive_file_path, filename) if not upload_success: print("Upload failed") return upload_time = datetime.now(tz=TIME_ZONE) print( f"Started at {scheduled_time:%Y-%m-%d %H:%M:%S}; " f"Made backup at {backup_time:%Y-%m-%d %H:%M:%S}; " f"Uploaded to S3 at {upload_time:%Y-%m-%d %H:%M:%S}" ) def run_threaded(job_func): """ Run the jobs in threading :param job_func: :return: """ job_thread = threading.Thread(target=job_func) job_thread.start() match SCHEDULE: case "MONTHLY": print("Scheduled to run backup task every 4 weeks") schedule.every(4).weeks.do(run_threaded, run_backup_files) case "WEEKLY": print("Scheduled to run backup task every Monday") schedule.every().monday.at("02:00").do(run_threaded, run_backup_files) case "HOURLY": print("Scheduled to run backup task every hour") schedule.every().hour.at(":05").do(run_threaded, run_backup_files) # For any other values, incl DAILY - run daily case _: print("Scheduled to run backup task every day at 02:00") schedule.every().day.at("02:00").do(run_threaded, run_backup_files) # Run first job immediately schedule.run_all() while True: schedule.run_pending() time.sleep(1)