diff options
author | Jose M. Guisado <jguisado@soleta.eu> | 2021-09-01 13:13:39 +0200 |
---|---|---|
committer | OpenGnSys Support Team <soporte-og@soleta.eu> | 2021-09-01 13:29:58 +0200 |
commit | 0c00f64669bfbdb6bde8e5cb7cfc205dec284e50 (patch) | |
tree | 3439f9c75282c0fa4e8cfe4ac88bc0fdd422b292 | |
parent | 082079ad78979efc163f785811dfd56deb078059 (diff) |
#1059 virtual: replace qmp polling for event listening
Polling for a qmp port availability is undesirable, as QEMU only handles
one connection to the qmp port at a time, ogClient may interfere with
cloneer-manager.
Check vm thread now connects to a separate qmp tcp socket, listening for
a shutdown guest event.
When ogClient is run just after ogVDI installation (before guest
installation) it will try to connect until it's possible, ie: after an
iso is specified and a qemu vm is started that exposes the appropiate
qmp tcp port.
-rw-r--r-- | src/virtual/ogOperations.py | 21 | ||||
-rw-r--r-- | src/virtual/poweroffd.py | 53 | ||||
-rw-r--r-- | src/virtual/qmp.py | 256 |
3 files changed, 322 insertions, 8 deletions
diff --git a/src/virtual/ogOperations.py b/src/virtual/ogOperations.py index 5da735e..a3f94a8 100644 --- a/src/virtual/ogOperations.py +++ b/src/virtual/ogOperations.py @@ -7,6 +7,7 @@ # (at your option) any later version. from src.ogRest import ThreadState +from src.virtual import poweroffd import socket import errno import select @@ -223,15 +224,19 @@ class OgVirtualOperations: return installed_os def check_vm_state_loop(self, ogRest): - POLLING_WAIT_TIME = 12 + # If we can't connect, wait until it's possible. while True: - time.sleep(POLLING_WAIT_TIME) - state = self.check_vm_state() - installed_os = self.get_installed_os() - if state == OgVM.State.STOPPED and \ - ogRest.state == ThreadState.IDLE and \ - len(installed_os) > 0: - self.poweroff_host() + try: + with socket.create_connection((poweroffd.QMP_DEFAULT_HOST, + poweroffd.QMP_DEFAULT_PORT)): + break + except ConnectionRefusedError: + time.sleep(1) + + qmpconn = poweroffd.init() + if poweroffd.run(qmpconn) < 0: + return + self.poweroff_host() def shellrun(self, request, ogRest): return diff --git a/src/virtual/poweroffd.py b/src/virtual/poweroffd.py new file mode 100644 index 0000000..63d08e1 --- /dev/null +++ b/src/virtual/poweroffd.py @@ -0,0 +1,53 @@ +# +# Copyright (C) 2021 Soleta Networks <info@soleta.eu> +# +# This program is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the +# Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. + + +from src.virtual.qmp import QEMUMonitorProtocol +from src.virtual.qmp import QMPCapabilitiesError, QMPConnectError + +QMP_DEFAULT_PORT = 4445 +QMP_DEFAULT_HOST = "127.0.0.1" + +def is_shutdown_event(qmp_ev): + """ + """ + return qmp_ev.get('event') == 'SHUTDOWN' + + +def init(host=QMP_DEFAULT_HOST, port=QMP_DEFAULT_PORT): + """ + """ + qmpconn = QEMUMonitorProtocol((host, port)) + try: + qmpconn.connect() + except ConnectionRefusedError: + print("Critical err: Connection refused") + return None + except QMPCapabilitiesError as e: + print("Error negotiating capabilities") + return None + return qmpconn + + +def run(qmpconn): + """ + """ + while(True): + try: + qmp_ev = qmpconn.pull_event(wait=True) + except QMPConnectError as e: + print("Error trying to pull an event") + ret = -1 + break + if is_shutdown_event(qmp_ev): + print("Detected guest shutdown, let's go") + ret = 0 + break + + qmpconn.close() + return ret diff --git a/src/virtual/qmp.py b/src/virtual/qmp.py new file mode 100644 index 0000000..5c8cf6a --- /dev/null +++ b/src/virtual/qmp.py @@ -0,0 +1,256 @@ +# QEMU Monitor Protocol Python class +# +# Copyright (C) 2009, 2010 Red Hat Inc. +# +# Authors: +# Luiz Capitulino <lcapitulino@redhat.com> +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. + +import json +import errno +import socket +import logging + + +class QMPError(Exception): + pass + + +class QMPConnectError(QMPError): + pass + + +class QMPCapabilitiesError(QMPError): + pass + + +class QMPTimeoutError(QMPError): + pass + + +class QEMUMonitorProtocol(object): + + #: Logger object for debugging messages + logger = logging.getLogger('QMP') + #: Socket's error class + error = socket.error + #: Socket's timeout + timeout = socket.timeout + + def __init__(self, address, server=False): + """ + Create a QEMUMonitorProtocol class. + + @param address: QEMU address, can be either a unix socket path (string) + or a tuple in the form ( address, port ) for a TCP + connection + @param server: server mode listens on the socket (bool) + @raise socket.error on socket connection errors + @note No connection is established, this is done by the connect() or + accept() methods + """ + self.__events = [] + self.__address = address + self.__sock = self.__get_sock() + self.__sockfile = None + if server: + self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.__sock.bind(self.__address) + self.__sock.listen(1) + + def __get_sock(self): + if isinstance(self.__address, tuple): + family = socket.AF_INET + else: + family = socket.AF_UNIX + return socket.socket(family, socket.SOCK_STREAM) + + def __negotiate_capabilities(self): + greeting = self.__json_read() + if greeting is None or "QMP" not in greeting: + raise QMPConnectError + # Greeting seems ok, negotiate capabilities + resp = self.cmd('qmp_capabilities') + if "return" in resp: + return greeting + raise QMPCapabilitiesError + + def __json_read(self, only_event=False): + while True: + data = self.__sockfile.readline() + if not data: + return + resp = json.loads(data) + if 'event' in resp: + self.logger.debug("<<< %s", resp) + self.__events.append(resp) + if not only_event: + continue + return resp + + def __get_events(self, wait=False): + """ + Check for new events in the stream and cache them in __events. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + """ + + # Check for new events regardless and pull them into the cache: + self.__sock.setblocking(0) + try: + self.__json_read() + except socket.error as err: + if err[0] == errno.EAGAIN: + # No data available + pass + self.__sock.setblocking(1) + + # Wait for new events, if needed. + # if wait is 0.0, this means "no wait" and is also implicitly false. + if not self.__events and wait: + if isinstance(wait, float): + self.__sock.settimeout(wait) + try: + ret = self.__json_read(only_event=True) + except socket.timeout: + raise QMPTimeoutError("Timeout waiting for event") + except: + raise QMPConnectError("Error while reading from socket") + if ret is None: + raise QMPConnectError("Error while reading from socket") + self.__sock.settimeout(None) + + def connect(self, negotiate=True): + """ + Connect to the QMP Monitor and perform capabilities negotiation. + + @return QMP greeting dict + @raise socket.error on socket connection errors + @raise QMPConnectError if the greeting is not received + @raise QMPCapabilitiesError if fails to negotiate capabilities + """ + self.__sock.connect(self.__address) + self.__sockfile = self.__sock.makefile() + if negotiate: + return self.__negotiate_capabilities() + + def accept(self): + """ + Await connection from QMP Monitor and perform capabilities negotiation. + + @return QMP greeting dict + @raise socket.error on socket connection errors + @raise QMPConnectError if the greeting is not received + @raise QMPCapabilitiesError if fails to negotiate capabilities + """ + self.__sock.settimeout(15) + self.__sock, _ = self.__sock.accept() + self.__sockfile = self.__sock.makefile() + return self.__negotiate_capabilities() + + def cmd_obj(self, qmp_cmd): + """ + Send a QMP command to the QMP Monitor. + + @param qmp_cmd: QMP command to be sent as a Python dict + @return QMP response as a Python dict or None if the connection has + been closed + """ + self.logger.debug(">>> %s", qmp_cmd) + try: + self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8')) + except socket.error as err: + if err[0] == errno.EPIPE: + return + raise socket.error(err) + resp = self.__json_read() + self.logger.debug("<<< %s", resp) + return resp + + def cmd(self, name, args=None, cmd_id=None): + """ + Build a QMP command and send it to the QMP Monitor. + + @param name: command name (string) + @param args: command arguments (dict) + @param cmd_id: command id (dict, list, string or int) + """ + qmp_cmd = {'execute': name} + if args: + qmp_cmd['arguments'] = args + if cmd_id: + qmp_cmd['id'] = cmd_id + return self.cmd_obj(qmp_cmd) + + def command(self, cmd, **kwds): + """ + Build and send a QMP command to the monitor, report errors if any + """ + ret = self.cmd(cmd, kwds) + if "error" in ret: + raise Exception(ret['error']['desc']) + return ret['return'] + + def pull_event(self, wait=False): + """ + Pulls a single event. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + + @return The first available QMP event, or None. + """ + self.__get_events(wait) + + if self.__events: + return self.__events.pop(0) + return None + + def get_events(self, wait=False): + """ + Get a list of available QMP events. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + + @return The list of available QMP events. + """ + self.__get_events(wait) + return self.__events + + def clear_events(self): + """ + Clear current list of pending events. + """ + self.__events = [] + + def close(self): + self.__sock.close() + self.__sockfile.close() + + def settimeout(self, timeout): + self.__sock.settimeout(timeout) + + def get_sock_fd(self): + return self.__sock.fileno() + + def is_scm_available(self): + return self.__sock.family == socket.AF_UNIX |