# Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import argparse from cli.config import cfg, OG_CLI_CFG_PATH from cli.utils import * import requests import shutil import sys import os class OgLive(): live_files = [ 'initrd.img', 'vmlinuz', 'filesystem.squashfs', ] tmp_extension = '.tmp' @staticmethod def _get_local_live_dir(): local_live_dir = cfg.get('local_live', '/var/www/html/ogrelive') if not local_live_dir: print(f'Error: local_live not defined in {OG_CLI_CFG_PATH}') return None if not os.path.isdir(local_live_dir): print(f'Warning: {local_live_dir} directoy does not exist, creating directory') try: os.makedirs(local_live_dir, exist_ok=True) except OSError as e: print(f'ERROR: Failed to create directory {local_live_dir}: {e}') return None return local_live_dir @staticmethod def _get_server_base_url(): server_live = cfg.get('server_live', 'https://opengnsys.soleta.eu/ogrelive') if not server_live: print(f'Error: server_live not defined in {OG_CLI_CFG_PATH}') return None return server_live def _is_live_in_server(live_name): server_live = OgLive._get_server_base_url() target_url = f'{server_live}/{live_name}/{OgLive.live_files[0]}' response = requests.head(target_url) return response.status_code == 200 @staticmethod def _delete_tmp_live_files(live_name): local_live_dir = OgLive._get_local_live_dir() if not local_live_dir: return 1 folder_path = os.path.join(local_live_dir, live_name) for file_name in os.listdir(folder_path): if not file_name.endswith(OgLive.tmp_extension): continue target_file = os.path.join(folder_path, file_name) try: os.remove(target_file) except OSError as e: print(f'ERROR: Failed to delete temporary file {target_file}: {e}') return 1 return 0 @staticmethod def _download_from_server(file_path, local_extension): live_file = os.path.basename(file_path) server_live = OgLive._get_server_base_url() if not server_live: return 1 file_url = f'{server_live}/{file_path}' local_live_dir = OgLive._get_local_live_dir() if not local_live_dir: return 1 local_path = os.path.join(local_live_dir, file_path) if local_extension: local_path += local_extension try: response = requests.get(file_url, stream=True, timeout=(5, None)) except requests.exceptions.RequestException as e: print(f'Request failed for {file_url}: {e}') return 1 if response.status_code == 200: try: with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) except OSError as e: print(f'File system error occurred: {e}') return 1 else: print(f'ERROR: Failed to download {live_file}. Status code: {response.status_code}') return 1 return 0 @staticmethod def list_live(rest, args): parser = argparse.ArgumentParser(prog='ogcli list live') parser.add_argument('--remote', action='store_true', help='(Optional) Obtain the list of the remote instead of the local lives') parsed_args = parser.parse_args(args) if parsed_args.remote: local_live_dir = OgLive._get_local_live_dir() if not local_live_dir: return 1 download_err = OgLive._download_from_server('ogrelive.json', local_extension=OgLive.tmp_extension) if download_err: OgLive._delete_tmp_live_files('') return 1 remote_json = os.path.join(local_live_dir, 'ogrelive.json') remote_json_tmp = remote_json + OgLive.tmp_extension try: shutil.move(remote_json_tmp, remote_json) except OSError as e: print(f'ERROR: cannot move {remote_json_tmp} into {remote_json}: {e}') return 1 try: with open(remote_json, 'r') as json_file: remote_data = json_file.read() except json.JSONDecodeError: print(f'ERROR: Failed parse malformed JSON file {remote_json}') return 1 except OSError as e: print(f'ERROR: cannot open {remote_json}: {e}') return 1 print_json(remote_data) return 0 res = rest.get('/oglive/list') if not res: return 1 print_json(res.text) return 0 @staticmethod def _is_same_checksum(file_path, checksum_path): local_checksum = compute_md5(file_path) try: with open(checksum_path, 'r') as f: remote_checksum = f.read().strip() except (FileNotFoundError, PermissionError, OSError) as e: print(f'ERROR: Cannot read checksum file for {live_file}: {e}') return False return local_checksum == remote_checksum @staticmethod def install_live(rest, args): parser = argparse.ArgumentParser(prog='ogcli install live') parser.add_argument('--name', nargs='?', required=True, help='Name of the center') parsed_args = parser.parse_args(args) live_name = parsed_args.name local_live_dir = OgLive._get_local_live_dir() if not local_live_dir: return 1 if not OgLive._is_live_in_server(live_name): print(f'{live_name} is not available on the server, it cannot be installed') return 1 local_dir = os.path.join(local_live_dir, live_name) if os.path.exists(local_dir): print(f'{live_name} already exists, checking for updates...') try: os.makedirs(local_dir, exist_ok=True) except OSError as e: print(f'ERROR: Failed to create directory {local_dir}: {e}') return 1 for live_file in OgLive.live_files: download_err = OgLive._download_from_server(os.path.join(live_name, live_file + '.full.sum'), local_extension=OgLive.tmp_extension) if download_err: OgLive._delete_tmp_live_files(live_name) return download_err file_path = os.path.join(local_dir, live_file) file_path_tmp = file_path + OgLive.tmp_extension checksum_path_tmp = file_path + '.full.sum' + OgLive.tmp_extension is_first_install = not os.path.exists(file_path) if is_first_install: print(f'Downloading {live_file}...') else: requires_update = not OgLive._is_same_checksum(file_path, checksum_path_tmp) if not requires_update: print(f'{live_file} is up-to-date, skipping') continue print(f'Updating {live_file}...') download_err = OgLive._download_from_server(os.path.join(live_name, live_file), local_extension=OgLive.tmp_extension) if download_err: OgLive._delete_tmp_live_files(live_name) return download_err if not OgLive._is_same_checksum(file_path_tmp, checksum_path_tmp): print(f'ERROR: Checksum mismatch for {live_file}') OgLive._delete_tmp_live_files(live_name) return 1 print(f'Checksum is OK for {live_file}') for file_name in os.listdir(local_dir): if not file_name.endswith(OgLive.tmp_extension): continue file_path_tmp = os.path.join(local_dir, file_name) file_path = file_path_tmp[:-len(OgLive.tmp_extension)] try: shutil.move(file_path_tmp, file_path) except OSError as e: print(f'ERROR: cannot move {src_file} into {target_file}: {e}') OgLive._delete_tmp_live_files(live_name) return 1 payload = {'name': live_name} res = rest.post('/oglive/add', payload=payload) if not res: return 1 return 0 @staticmethod def delete_live(rest, args): parser = argparse.ArgumentParser(prog='ogcli delete live') parser.add_argument('--name', nargs='?', required=True, help='Name of the center') parsed_args = parser.parse_args(args) live_name = parsed_args.name payload = {'name': live_name} res = rest.post('/oglive/delete', payload=payload) if not res: return 1 local_live_dir = OgLive._get_local_live_dir() if not local_live_dir: return 1 local_dir = os.path.join(local_live_dir, live_name) if os.path.exists(local_dir): try: shutil.rmtree(local_dir) except OSError as e: print(f'Error deleting directory {local_dir}: {e}') return 1 else: print(f'Error: no directory found for {live_name}') return 0 @staticmethod def set_live(rest, args): parser = argparse.ArgumentParser(prog='ogcli set live') parser.add_argument('--default', action='store_true', required=True, help='set the default live image') parser.add_argument('--name', nargs='?', required=True, help='Name of the live') parsed_args = parser.parse_args(args) live_name = parsed_args.name payload = {'name': live_name} res = rest.post('/oglive/default', payload=payload) if not res: return 1 return 0