# # Copyright (C) 2023 Soleta Networks # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the # Free Software Foundation; either version 3 of the License, or # (at your option) any later version. import ipaddress import logging import os import subprocess import shlex import shutil from subprocess import PIPE, DEVNULL, STDOUT, CalledProcessError from src.utils.fs import umount from src.log import OgError class ImageInfo: """ Class that stores the OpenGnsys partition image information. """ def __init__(self, filesystem=None, datasize=None): self.filesystem = filesystem self.datasize = datasize self.size = 0 self.mtime = 0 self.perms = 0 self.clonator = 'PARTCLONE' self.compressor = 'LZOP' def human_to_kb(size, unit): """ Returns the KB conversion of a human readable string size and unit. """ size = float(size.replace(',', '.')) if unit == 'GB': return size * 1000000 if unit == 'MB': return size * 1000 return 0 def fill_imageinfo(line, image_info): """ Updates ImageInfo object with information processed from a single partclone.info output line. ImageInfo object may not be updated if the line is invalid or does not contain filesystem or device size information. """ if 'File system' in line: filesystem = line.rstrip().split(' ')[-1] image_info.filesystem = filesystem elif 'Device size' in line: l = [word for word in line.rstrip().split(' ') if word][2:4] device_size = human_to_kb(*l) image_info.datasize = device_size def image_info_from_partclone(partclone_output): """ Return an ImageInfo object from partclone.info output. """ image_info = ImageInfo() for n, line in enumerate(partclone_output.split('\n')): # Ignore first two lines of partclone.info output if n < 2: continue if image_info.datasize and image_info.filesystem: break fill_imageinfo(line, image_info) if not image_info.datasize: raise OgError("Missing device size from partclone.info output") elif not image_info.filesystem: raise OgError("Missing filesystem from partclone.info output") return image_info def run_lzop_partcloneinfo(image_path): """ Run lzop to decompress an OpenGnsys partition image, feed lzop output to a partclone.info subprocess. Return the partclone.info subprocess output. """ cmd1 = f'{shutil.which("lzop")} -dc {image_path}' cmd2 = f'{shutil.which("partclone.info")} -s -' args1 = shlex.split(cmd1) args2 = shlex.split(cmd2) p1 = subprocess.Popen(args1, stdout=PIPE, stderr=DEVNULL) p2 = subprocess.Popen(args2, stdout=PIPE, stdin=p1.stdout, stderr=STDOUT, encoding='utf-8') p1.stdout.close() p2_out, p2_err = p2.communicate() if p2.returncode != 0: raise OgError(f'Unable to process image {image_path}') return p2_out def ogGetImageInfo(image_path): """ Obtain filesystem and device size information of an OpenGnsys partition image. This method supports compressed images with lzop that have been created with partclone. Returns an ImageInfo object. >>> image_info = ogGetImageInfo('/opt/opengnsys/images/foobar.img') >>> image_info.filesystem >>> 'NTFS' >>> image_info.datasize >>> 140000000 >>> image_info.compressor >>> 'LZOP' >>> image_info.clonator >>> 'PARTCLONE' """ partclone_output = run_lzop_partcloneinfo(image_path) image_info = image_info_from_partclone(partclone_output) return image_info def change_access(mode='rw', user='opengnsys', pwd='og'): """ 'CambiarAcceso' (admin/Interface/CambiarAcceso) rewrite into native Python. Remount the (remote) samba directory that contains the OpenGnsys images. Specify access mode ('rw', or 'ro') with mode parameter (default 'rw'). Specify samba credentials with user and pwd parameter. Return 0 if exit-code was 0. Return -1 otherwise. """ assert mode in ['rw', 'ro'], 'Invalid remount mode option' cmd = shlex.split(f'mount -o remount,{mode},username={user},password={pwd} /opt/opengnsys/images') p = subprocess.run(cmd, stdout=DEVNULL, stderr=DEVNULL) return 0 if p.returncode == 0 else -1 def ogChangeRepo(ip, smb_user='opengnsys', smb_pass='og'): """ Umount current Samba directory of OpenGnsys images and mount new Samba directory (preserving previous access mode). If the mount fails, fallback to the image directory that was mounted before calling this function. If there is no previous image directory mount, then simply try mounting the new Samba directory with readonly mode option. Any CalledProcessError raised by the fallback mount subprocess should be handled by the caller of this function. """ def fsopts_mode(fsopts): for opt in fsopts.split(','): if opt in ['rw', 'ro']: return opt def process_mntent(line): name, mntdir, fstype, opts, freq, passno = line.split(' ') mode = fsopts_mode(opts) return name, mntdir, fsopts_mode(opts) try: ipaddr = ipaddress.ip_address(ip) except ValueError as e: raise OgError(f'Invalid IP address {ip} received') from e mounted = False with open('/etc/mtab') as f: for line in f: if 'ogimages' in line: orig_name, mntdir, mode = process_mntent(line) mounted = True break new_name = f'//{ip}/ogimages' if not mounted: orig_name = new_name mntdir = '/opt/opengnsys/images' mode = 'ro' else: umount(mntdir) err = 0 cmd = f'mount.cifs -o {mode},username={smb_user},password={smb_pass} {new_name} /opt/opengnsys/images' result = subprocess.run(shlex.split(cmd)) if result.returncode != 0: err = -1 logging.error(f'Error mounting {new_name} in /opt/opengnsys/images with error {result.returncode}') cmd = f'mount.cifs -o {mode},username={smb_user},password={smb_pass} {orig_name} /opt/opengnsys/images' subprocess.run(shlex.split(cmd)) return err def restoreImageCustom(repo_ip, image_name, disk, partition, method): """ """ if not shutil.which('restoreImageCustom'): raise OgError('restoreImageCustom not found') cmd = f'restoreImageCustom {repo_ip} {image_name} {disk} {partition} {method}' with open('/tmp/command.log', 'wb', 0) as logfile: try: proc = subprocess.run(cmd, stdout=logfile, encoding='utf-8', shell=True, check=True) except OSError as e: raise OgError(f'Error processing restoreImageCustom: {e}') from e return proc.returncode def configureOs(disk, partition): """ """ if shutil.which('configureOsCustom'): cmd_configure = f"configureOsCustom {disk} {partition}" else: cmd_configure = f"configureOs {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=PIPE, encoding='utf-8', shell=True, check=True) out = proc.stdout except OSError as e: raise OgError(f'Error processing configureOsCustom: {e}') from e return out