1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
|
#
# Copyright (C) 2023 Soleta Networks <info@soleta.eu>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the
# Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
import ipaddress
import logging
import os
import stat
import subprocess
import shlex
import shutil
from subprocess import PIPE, DEVNULL, STDOUT, CalledProcessError
from src.utils.fs import umount
from src.log import OgError
class ImageInfo:
"""
Class that stores the OpenGnsys partition image information.
"""
def __init__(self, filesystem=None, datasize=None):
self.filesystem = filesystem
self.datasize = datasize
self.size = 0
self.mtime = 0
self.perms = 0
self.clonator = 'PARTCLONE'
self.compressor = 'LZOP'
self.checksum = 0
def human_to_kb(size, unit):
"""
Returns the KB conversion of a human readable string size and unit.
"""
size = float(size.replace(',', '.'))
if unit == 'GB':
return size * 1000000
if unit == 'MB':
return size * 1000
return 0
def fill_imageinfo(line, image_info):
"""
Updates ImageInfo object with information processed from
a single partclone.info output line.
ImageInfo object may not be updated if the line is invalid or does not
contain filesystem or device size information.
"""
if 'File system' in line:
filesystem = line.rstrip().split(' ')[-1]
image_info.filesystem = filesystem
elif 'Device size' in line:
l = [word for word in line.rstrip().split(' ') if word][2:4]
device_size = human_to_kb(*l)
image_info.datasize = device_size
def image_info_from_partclone(partclone_output):
"""
Return an ImageInfo object from partclone.info output.
"""
image_info = ImageInfo()
for n, line in enumerate(partclone_output.split('\n')):
# Ignore first two lines of partclone.info output
if n < 2:
continue
if image_info.datasize and image_info.filesystem:
break
fill_imageinfo(line, image_info)
if not image_info.datasize:
raise OgError("Missing device size from partclone.info output")
elif not image_info.filesystem:
raise OgError("Missing filesystem from partclone.info output")
return image_info
def run_lzop_partcloneinfo(image_path):
"""
Run lzop to decompress an OpenGnsys partition image, feed
lzop output to a partclone.info subprocess.
Return the partclone.info subprocess output.
"""
cmd1 = f'{shutil.which("lzop")} -dc {image_path}'
cmd2 = f'{shutil.which("partclone.info")} -s -'
args1 = shlex.split(cmd1)
args2 = shlex.split(cmd2)
p1 = subprocess.Popen(args1, stdout=PIPE, stderr=DEVNULL)
p2 = subprocess.Popen(args2, stdout=PIPE, stdin=p1.stdout,
stderr=STDOUT, encoding='utf-8')
p1.stdout.close()
p2_out, p2_err = p2.communicate()
if p2.returncode != 0:
raise OgError(f'Unable to process image {image_path}')
return p2_out
def get_image_info(image_path):
"""
Obtain filesystem and device size information of an OpenGnsys partition
image.
This method supports compressed images with lzop that have been created
with partclone.
Returns an ImageInfo object.
>>> image_info = get_image_info('/opt/opengnsys/images/foobar.img')
>>> image_info.filesystem
>>> 'NTFS'
>>> image_info.datasize
>>> 140000000
>>> image_info.compressor
>>> 'LZOP'
>>> image_info.clonator
>>> 'PARTCLONE'
"""
partclone_output = run_lzop_partcloneinfo(image_path)
image_info = image_info_from_partclone(partclone_output)
try:
st = os.stat(image_path)
image_info.size = st.st_size
image_info.perms = st.st_mode & (stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
image_info.mtime = int(st.st_mtime)
except Exception as e:
logging.info(f'cannot retrieve stats from {image_path}: {e}')
return image_info
def change_access(mode='rw', user='opengnsys', pwd='og'):
"""
'CambiarAcceso' (admin/Interface/CambiarAcceso) rewrite into native Python.
Remount the (remote) samba directory that contains the OpenGnsys images.
Specify access mode ('rw', or 'ro') with mode parameter (default 'rw').
Specify samba credentials with user and pwd parameter.
Return 0 if exit-code was 0. Return -1 otherwise.
"""
assert mode in ['rw', 'ro'], 'Invalid remount mode option'
cmd = shlex.split(f'mount -o remount,{mode},username={user},password={pwd} /opt/opengnsys/images')
p = subprocess.run(cmd, stdout=DEVNULL, stderr=DEVNULL)
return 0 if p.returncode == 0 else -1
def ogChangeRepo(ip, smb_user='opengnsys', smb_pass='og'):
"""
Umount current Samba directory of OpenGnsys images and mount new Samba
directory (preserving previous access mode).
If the mount fails, fallback to the image directory that was mounted before
calling this function.
If there is no previous image directory mount, then simply try mounting the
new Samba directory with readonly mode option.
Any CalledProcessError raised by the fallback mount subprocess should be
handled by the caller of this function.
"""
def fsopts_mode(fsopts):
for opt in fsopts.split(','):
if opt in ['rw', 'ro']:
return opt
def process_mntent(line):
name, mntdir, fstype, opts, freq, passno = line.split(' ')
mode = fsopts_mode(opts)
return name, mntdir, fsopts_mode(opts)
try:
ipaddr = ipaddress.ip_address(ip)
except ValueError as e:
raise OgError(f'Invalid IP address {ip} received') from e
mounted = False
with open('/etc/mtab') as f:
for line in f:
if 'ogimages' in line:
orig_name, mntdir, mode = process_mntent(line)
mounted = True
break
new_name = f'//{ip}/ogimages'
if not mounted:
orig_name = new_name
mntdir = '/opt/opengnsys/images'
mode = 'ro'
else:
umount(mntdir)
err = 0
cmd = f'mount.cifs -o {mode},username={smb_user},password={smb_pass} {new_name} /opt/opengnsys/images'
result = subprocess.run(shlex.split(cmd))
if result.returncode != 0:
err = -1
logging.error(f'Error mounting {new_name} in /opt/opengnsys/images with error {result.returncode}')
cmd = f'mount.cifs -o {mode},username={smb_user},password={smb_pass} {orig_name} /opt/opengnsys/images'
subprocess.run(shlex.split(cmd))
return err
|