1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
#
# Copyright (C) 2023 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import ipaddress
import logging
import os
import subprocess
import shlex
import shutil
from subprocess import PIPE, DEVNULL, STDOUT, CalledProcessError
from src.utils.fs import umount
class ImageInfo:
"""
Class that stores the OpenGnsys partition image information.
"""
def __init__(self, filesystem=None, datasize=None):
self.filesystem = filesystem
self.datasize = datasize
self.size = 0
self.mtime = 0
self.perms = 0
self.clonator = 'PARTCLONE'
self.compressor = 'LZOP'
def human_to_kb(size, unit):
"""
Returns the KB conversion of a human readable string size and unit.
"""
size = float(size.replace(',', '.'))
if unit == 'GB':
return size * 1000000
if unit == 'MB':
return size * 1000
return 0
def fill_imageinfo(line, image_info):
"""
Updates ImageInfo object with information processed from
a single partclone.info output line.
ImageInfo object may not be updated if the line is invalid or does not
contain filesystem or device size information.
"""
if 'File system' in line:
filesystem = line.rstrip().split(' ')[-1]
image_info.filesystem = filesystem
elif 'Device size' in line:
l = [word for word in line.rstrip().split(' ') if word][2:4]
device_size = human_to_kb(*l)
image_info.datasize = device_size
def image_info_from_partclone(partclone_output):
"""
Return an ImageInfo object from partclone.info output.
"""
image_info = ImageInfo()
for n, line in enumerate(partclone_output.split('\n')):
# Ignore first two lines of partclone.info output
if n < 2:
continue
if image_info.datasize and image_info.filesystem:
break
fill_imageinfo(line, image_info)
if not image_info.datasize:
raise ValueError("Missing device size from partclone.info output")
elif not image_info.filesystem:
raise ValueError("Missing filesystem from partclone.info output")
return image_info
def run_lzop_partcloneinfo(image_path):
"""
Run lzop to decompress an OpenGnsys partition image, feed
lzop output to a partclone.info subprocess.
Return the partclone.info subprocess output.
"""
cmd1 = f'{shutil.which("lzop")} -dc {image_path}'
cmd2 = f'{shutil.which("partclone.info")} -s -'
args1 = shlex.split(cmd1)
args2 = shlex.split(cmd2)
p1 = subprocess.Popen(args1, stdout=PIPE, stderr=DEVNULL)
p2 = subprocess.Popen(args2, stdout=PIPE, stdin=p1.stdout,
stderr=STDOUT, encoding='utf-8')
p1.stdout.close()
p2_out, p2_err = p2.communicate()
if p2.returncode != 0:
raise ValueError(f'Unable to process image {image_path}')
return p2_out
def ogGetImageInfo(image_path):
"""
Obtain filesystem and device size information of an OpenGnsys partition
image.
This method supports compressed images with lzop that have been created
with partclone.
Returns an ImageInfo object.
>>> image_info = ogGetImageInfo('/opt/opengnsys/images/foobar.img')
>>> image_info.filesystem
>>> 'NTFS'
>>> image_info.datasize
>>> 140000000
>>> image_info.compressor
>>> 'LZOP'
>>> image_info.clonator
>>> 'PARTCLONE'
"""
partclone_output = run_lzop_partcloneinfo(image_path)
image_info = image_info_from_partclone(partclone_output)
return image_info
def change_access(mode='rw', user='opengnsys', pwd='og'):
"""
'CambiarAcceso' (admin/Interface/CambiarAcceso) rewrite into native Python.
Remount the (remote) samba directory that contains the OpenGnsys images.
Specify access mode ('rw', or 'ro') with mode parameter (default 'rw').
Specify samba credentials with user and pwd parameter.
Return 0 if exit-code was 0. Return -1 otherwise.
"""
assert mode in ['rw', 'ro'], 'Invalid remount mode option'
cmd = shlex.split(f'mount -o remount,{mode},username={user},password={pwd} /opt/opengnsys/images')
p = subprocess.run(cmd, stdout=DEVNULL, stderr=DEVNULL)
return 0 if p.returncode == 0 else -1
def ogChangeRepo(ip, smb_user='opengnsys', smb_pass='og'):
"""
Umount current Samba directory of OpenGnsys images and mount new Samba
directory (preserving previous access mode).
If the mount fails, fallback to the image directory that was mounted before
calling this function.
If there is no previous image directory mount, then simply try mounting the
new Samba directory with readonly mode option.
Any CalledProcessError raised by the fallback mount subprocess should be
handled by the caller of this function.
"""
def fsopts_mode(fsopts):
for opt in fsopts.split(','):
if opt in ['rw', 'ro']:
return opt
def process_mntent(line):
name, mntdir, fstype, opts, freq, passno = line.split(' ')
mode = fsopts_mode(opts)
return name, mntdir, fsopts_mode(opts)
try:
ipaddr = ipaddress.ip_address(ip)
except ValueError as e:
raise ValueError(f'Invalid IP address {ip} received')
mounted = False
with open('/etc/mtab') as f:
for line in f:
if 'ogimages' in line:
orig_name, mntdir, mode = process_mntent(line)
mounted = True
break
new_name = f'//{ip}/ogimages'
if not mounted:
orig_name = new_name
mntdir = '/opt/opengnsys/images'
mode = 'ro'
else:
umount(mntdir)
err = 0
cmd = f'mount.cifs -o {mode},username={smb_user},password={smb_pass} {new_name} /opt/opengnsys/images'
result = subprocess.run(shlex.split(cmd))
if result.returncode != 0:
err = -1
logging.error(f'Error mounting {new_name} in /opt/opengnsys/images with error {result.returncode}')
cmd = f'mount.cifs -o {mode},username={smb_user},password={smb_pass} {orig_name} /opt/opengnsys/images'
subprocess.run(shlex.split(cmd))
return err
def restoreImageCustom(repo_ip, image_name, disk, partition, method):
"""
"""
if not shutil.which('restoreImageCustom'):
raise OSError('restoreImageCustom not found')
cmd = f'restoreImageCustom {repo_ip} {image_name} {disk} {partition} {method}'
with open('/tmp/command.log', 'wb', 0) as logfile:
try:
proc = subprocess.run(cmd,
stdout=logfile,
encoding='utf-8',
shell=True,
check=True)
except OSError as e:
raise OSError(f'Error processing restoreImageCustom: {e}') from e
return proc.returncode
def configureOs(disk, partition):
"""
"""
if shutil.which('configureOsCustom'):
cmd_configure = f"configureOsCustom {disk} {partition}"
else:
cmd_configure = f"configureOs {disk} {partition}"
try:
proc = subprocess.run(cmd_configure,
stdout=PIPE,
encoding='utf-8',
shell=True,
check=True)
out = proc.stdout
except OSError as e:
raise OSError(f'Error processing configureOsCustom: {e}') from e
return out
def ogCopyEfiBootLoader(disk, partition):
cmd = f'ogCopyEfiBootLoader {disk} {partition}'
try:
proc = subprocess.run(cmd,
shell=True,
check=True)
except OSError as e:
raise OSError(f'Error processing ogCopyEfiBootLoader: {e}') from e
|