# Copyright (C) 2020-2024 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import argparse from config import cfg, OG_CLI_CFG_PATH from cli.utils import * import requests import shutil import sys import os class OgLive(): live_files = [ 'oginitrd.img', 'ogvmlinuz', 'filesystem.squashfs', 'oginitrd.img.full.sum', 'ogvmlinuz.full.sum', 'filesystem.squashfs.full.sum' ] tmp_extension = '.tmp' @staticmethod def _get_local_live_path(): local_live = cfg.get('local_live', '/var/www/html/ogrelive') if not local_live: print(f'Error: local_live not defined in {OG_CLI_CFG_PATH}') return None if not os.path.isdir(local_live): print(f'Error: {local_live} directoy does not exist') return None return local_live @staticmethod def list_live(rest, args): parser = argparse.ArgumentParser(prog='ogcli list live') parser.add_argument('--remote', action='store_true', help='(Optional) Obtain the list of the remote instead of the local lives') parsed_args = parser.parse_args(args) if parsed_args.remote: local_live = OgLive._get_local_live_path() if not local_live: return 1 remote_json = os.path.join(local_live, 'ogrelive.json') try: with open(remote_json, 'r') as json_file: remote_data = json_file.read() except json.JSONDecodeError: print(f'ERROR: Failed parse malformed JSON file {remote_json}') return 1 except OSError as e: print(f'ERROR: cannot open {remote_json}: {e}') return 1 print_json(remote_data) return 0 res = rest.get('/oglive/list') if not res: return 1 print_json(res.text) return 0 @staticmethod def _delete_live_files(live_name, extension): local_live = OgLive._get_local_live_path() if not local_live: return 1 ret = 0 folder_path = os.path.join(local_live, live_name) for live_file in OgLive.live_files: target_file = os.path.join(folder_path, live_file) + extension try: os.remove(target_file) except OSError as e: print(f'Error deleting {target_file}: {e}') ret = 1 print(f'Successfully deleted {target_file}') return ret @staticmethod def install_live(rest, args): parser = argparse.ArgumentParser(prog='ogcli install live') parser.add_argument('--name', nargs='?', required=True, help='Name of the center') parsed_args = parser.parse_args(args) live_name = parsed_args.name server_live = cfg.get('server_live', 'https://opengnsys.soleta.eu/ogrelive') local_live = OgLive._get_local_live_path() if not local_live: return 1 local_dir = os.path.join(local_live, live_name) if os.path.exists(local_dir): print(f'Local files for {live_name} found, updating files...') try: os.makedirs(local_dir, exist_ok=True) except OSError as e: print(f'ERROR: Failed to create directory {local_dir}: {e}') return 1 # File download remote_dir = f'{server_live}/{live_name}' for live_file in OgLive.live_files: file_url = f'{remote_dir}/{live_file}' local_file = os.path.join(local_dir, live_file) response = requests.get(file_url, stream=True, timeout=(5, None)) if response.status_code == 200: with open(local_file + OgLive.tmp_extension, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f'Successfully downloaded {live_file}') else: print(f'ERROR: Failed to download {live_file}. Status code: {response.status_code}') OgLive._delete_live_files(live_name, OgLive.tmp_extension) return 1 # Checksum verification check_files = [ os.path.join(local_dir, 'oginitrd.img'), os.path.join(local_dir, 'ogvmlinuz'), os.path.join(local_dir, 'filesystem.squashfs'), ] for check_file in check_files: local_checksum = compute_md5(check_file + OgLive.tmp_extension) try: with open(check_file + '.full.sum' + OgLive.tmp_extension, 'r') as f: remote_checksum = f.read().strip() except (FileNotFoundError, PermissionError, OSError) as e: print(f'ERROR: Cannot read checksum file {check_file}.full.sum: {e}') OgLive._delete_live_files(live_name, OgLive.tmp_extension) return -1 if local_checksum != remote_checksum: print(f'ERROR: Checksum mismatch for {check_file}') OgLive._delete_live_files(live_name, OgLive.tmp_extension) return -1 print(f'Checksum is OK for {check_file}') # move tmp files for live_file in OgLive.live_files: target_file = os.path.join(local_dir, live_file) src_file = target_file + OgLive.tmp_extension shutil.move(src_file, target_file) payload = {'name': live_name} res = rest.post('/oglive/add', payload=payload) if not res: return 1 return 0 @staticmethod def delete_live(rest, args): parser = argparse.ArgumentParser(prog='ogcli delete live') parser.add_argument('--name', nargs='?', required=True, help='Name of the center') parsed_args = parser.parse_args(args) live_name = parsed_args.name payload = {'name': live_name} res = rest.post('/oglive/delete', payload=payload) if not res: return 1 local_live = OgLive._get_local_live_path() if not local_live: return 1 local_dir = os.path.join(local_live, live_name) if os.path.exists(local_dir): shutil.rmtree(local_dir) return 0