import ipaddress import logging import os import subprocess import shlex import shutil from subprocess import PIPE, DEVNULL, STDOUT, CalledProcessError from src.utils.fs import umount class ImageInfo: """ Class that stores the OpenGnsys partition image information. """ def __init__(self, filesystem=None, datasize=None): self.filesystem = filesystem self.datasize = datasize self.clonator = 'PARTCLONE' self.compressor = 'LZOP' def human_to_kb(size, unit): """ Returns the KB conversion of a human readable string size and unit. """ size = float(size.replace(',', '.')) if unit == 'GB': return size * 1000000 if unit == 'MB': return size * 1000 return 0 def fill_imageinfo(line, image_info): """ Updates ImageInfo object with information processed from a single partclone.info output line. ImageInfo object may not be updated if the line is invalid or does not contain filesystem or device size information. """ if 'File system' in line: filesystem = line.rstrip().split(' ')[-1] image_info.filesystem = filesystem elif 'Device size' in line: l = [word for word in line.rstrip().split(' ') if word][2:4] device_size = human_to_kb(*l) image_info.datasize = device_size def process_output_partcloneinfo(output): """ Parses image information from partclone.info output. Returns an ImageInfo object. """ image_info = ImageInfo() for n, line in enumerate(output.split('\n')): if n < 2: continue if image_info.datasize and image_info.filesystem: break fill_imageinfo(line, image_info) if not image_info.datasize: raise ValueError("Missing device size from partclone.info output") elif not image_info.filesystem: raise ValueError("Missing filesystem from partclone.info output") return image_info def process_image_partcloneinfo(filename): """ Decompress using lzop and executes partclone.info to fetch a partition image's information. Returns partclone.info stdout and stderr. """ cmd1 = f'{shutil.which("lzop")} -dc {filename}' cmd2 = f'{shutil.which("partclone.info")} -s -' args1 = shlex.split(cmd1) args2 = shlex.split(cmd2) p1 = subprocess.Popen(args1, stdout=PIPE, stderr=DEVNULL) p2 = subprocess.Popen(args2, stdout=PIPE, stdin=p1.stdout, stderr=STDOUT, encoding='utf-8') p1.stdout.close() p2_out, p2_err = p2.communicate() if p2.returncode != 0: raise ValueError('Unable to process image {filename}') return p2_out def ogGetImageInfo(filename): """ Obtain filesystem and device size information of an OpenGnsys partition image. This method supports compressed images with lzop that have been created with partclone. Returns an ImageInfo object. >>> image_info = ogGetImageInfo('/opt/opengnsys/images/foobar.img') >>> image_info.filesystem >>> 'NTFS' >>> image_info.datasize >>> 140000000 >>> image_info.compressor >>> 'LZOP' >>> image_info.clonator >>> 'PARTCLONE' """ out = process_image_partcloneinfo(filename) image_info = process_output_partcloneinfo(out) return image_info def cambiar_acceso(mode='rw', user='opengnsys', pwd='og'): """ 'CambiarAcceso' (admin/Interface/CambiarAcceso) rewrite into native Python. Remount the (remote) samba directory that contains the OpenGnsys images. Specify access mode ('rw', or 'ro') with mode parameter (default 'rw'). Specify samba credentials with user and pwd parameter. Return True if exit-code was 0. Return False otherwise. """ if mode not in ['rw', 'ro']: raise ValueError('Invalid remount mode option') cmd = shlex.split(f'mount -o remount,{mode},username={user},password={pwd} /opt/opengnsys/images') p = subprocess.run(cmd, stdout=DEVNULL, stderr=DEVNULL) return p.returncode == 0 def ogChangeRepo(ip, smb_user='opengnsys', smb_pass='og'): """ Umount current Samba directory of OpenGnsys images and mount new Samba directory (preserving previous access mode). If the mount fails, fallback to the image directory that was mounted before calling this function. If there is no previous image directory mount, then simply try mounting the new Samba directory with readonly mode option. Any CalledProcessError raised by the fallback mount subprocess should be handled by the caller of this function. """ def fsopts_mode(fsopts): for opt in fsopts.split(','): if opt in ['rw', 'ro']: return opt def process_mntent(line): name, mntdir, fstype, opts, freq, passno = line.split(' ') mode = fsopts_mode(opts) return name, mntdir, fsopts_mode(opts) try: ipaddr = ipaddress.ip_address(ip) except ValueError as e: raise ValueError('Invalid ip address') mounted = False with open('/etc/mtab') as f: for line in f: if 'ogimages' in line: orig_name, mntdir, mode = process_mntent(line) mounted = True break new_name = f'//{ip}/ogimages' if not mounted: orig_name = new_name mntdir = '/opt/opengnsys/images' mode = 'ro' else: umount(mntdir) cmd = f'mount.cifs -o {mode},username={smb_user},password={smb_pass} {new_name} /opt/opengnsys/images' try: result = subprocess.run(shlex.split(cmd), check=True) except CalledProcessError as e: logging.error(f'Error mounting new image directory: {e}') cmd = f'mount.cifs -o {mode},username={smb_user},password={smb_pass} {orig_name} /opt/opengnsys/images' result = subprocess.run(shlex.split(cmd), check=True) finally: return result def restoreImageCustom(repo_ip, image_name, disk, partition, method): """ """ if not shutil.which('restoreImageCustom'): logging.error('Invalid restoreImageCustom invocation') raise ValueError('Error: restoreImageCustom not found') if ogChangeRepo(repo).returncode != 0: logging.error('ogChangeRepo could not change repository to %s', repo) raise ValueError(f'Error: Cannot change repository to {repo}') cmd = f'restoreImageCustom {repo_ip} {image_name} {disk} {partition} {method}' with open('/tmp/command.log', 'wb', 0) as logfile: try: proc = subprocess.run(cmd, stdout=logfile, encoding='utf-8', shell=True) except: logging.error('Exception when running restoreImageCustom subprocess') raise ValueError('Error: Incorrect command value') return proc.returncode def configureOs(disk, partition): """ """ if shutil.which('configureOsCustom'): cmd_configure = f"configureOsCustom {disk} {partition}" else: cmd_configure = f"configureOs {disk} {partition}" try: proc = subprocess.run(cmd_configure, stdout=PIPE, encoding='utf-8', shell=True) out = proc.stdout except: logging.error('Exception when running configureOs subprocess') raise ValueError('Error: Incorrect command value') return out def ogCopyEfiBootLoader(disk, partition): cmd = f'ogCopyEfiBootLoader {disk} {partition}' try: proc = subprocess.run(cmd, shell=True) except: logging.error('Exception when running ogCopyEfiBootLoader subprocess') raise ValueError('Subprocess error: ogCopyEfiBootloader')