summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJose M. Guisado <jguisado@soleta.eu>2021-09-01 13:13:39 +0200
committerOpenGnSys Support Team <soporte-og@soleta.eu>2021-09-01 13:29:58 +0200
commit0c00f64669bfbdb6bde8e5cb7cfc205dec284e50 (patch)
tree3439f9c75282c0fa4e8cfe4ac88bc0fdd422b292
parent082079ad78979efc163f785811dfd56deb078059 (diff)
#1059 virtual: replace qmp polling for event listening
Polling for a qmp port availability is undesirable, as QEMU only handles one connection to the qmp port at a time, ogClient may interfere with cloneer-manager. Check vm thread now connects to a separate qmp tcp socket, listening for a shutdown guest event. When ogClient is run just after ogVDI installation (before guest installation) it will try to connect until it's possible, ie: after an iso is specified and a qemu vm is started that exposes the appropiate qmp tcp port.
-rw-r--r--src/virtual/ogOperations.py21
-rw-r--r--src/virtual/poweroffd.py53
-rw-r--r--src/virtual/qmp.py256
3 files changed, 322 insertions, 8 deletions
diff --git a/src/virtual/ogOperations.py b/src/virtual/ogOperations.py
index 5da735e..a3f94a8 100644
--- a/src/virtual/ogOperations.py
+++ b/src/virtual/ogOperations.py
@@ -7,6 +7,7 @@
# (at your option) any later version.
from src.ogRest import ThreadState
+from src.virtual import poweroffd
import socket
import errno
import select
@@ -223,15 +224,19 @@ class OgVirtualOperations:
return installed_os
def check_vm_state_loop(self, ogRest):
- POLLING_WAIT_TIME = 12
+ # If we can't connect, wait until it's possible.
while True:
- time.sleep(POLLING_WAIT_TIME)
- state = self.check_vm_state()
- installed_os = self.get_installed_os()
- if state == OgVM.State.STOPPED and \
- ogRest.state == ThreadState.IDLE and \
- len(installed_os) > 0:
- self.poweroff_host()
+ try:
+ with socket.create_connection((poweroffd.QMP_DEFAULT_HOST,
+ poweroffd.QMP_DEFAULT_PORT)):
+ break
+ except ConnectionRefusedError:
+ time.sleep(1)
+
+ qmpconn = poweroffd.init()
+ if poweroffd.run(qmpconn) < 0:
+ return
+ self.poweroff_host()
def shellrun(self, request, ogRest):
return
diff --git a/src/virtual/poweroffd.py b/src/virtual/poweroffd.py
new file mode 100644
index 0000000..63d08e1
--- /dev/null
+++ b/src/virtual/poweroffd.py
@@ -0,0 +1,53 @@
+#
+# Copyright (C) 2021 Soleta Networks <info@soleta.eu>
+#
+# This program is free software: you can redistribute it and/or modify it under
+# the terms of the GNU Affero General Public License as published by the
+# Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+
+
+from src.virtual.qmp import QEMUMonitorProtocol
+from src.virtual.qmp import QMPCapabilitiesError, QMPConnectError
+
+QMP_DEFAULT_PORT = 4445
+QMP_DEFAULT_HOST = "127.0.0.1"
+
+def is_shutdown_event(qmp_ev):
+ """
+ """
+ return qmp_ev.get('event') == 'SHUTDOWN'
+
+
+def init(host=QMP_DEFAULT_HOST, port=QMP_DEFAULT_PORT):
+ """
+ """
+ qmpconn = QEMUMonitorProtocol((host, port))
+ try:
+ qmpconn.connect()
+ except ConnectionRefusedError:
+ print("Critical err: Connection refused")
+ return None
+ except QMPCapabilitiesError as e:
+ print("Error negotiating capabilities")
+ return None
+ return qmpconn
+
+
+def run(qmpconn):
+ """
+ """
+ while(True):
+ try:
+ qmp_ev = qmpconn.pull_event(wait=True)
+ except QMPConnectError as e:
+ print("Error trying to pull an event")
+ ret = -1
+ break
+ if is_shutdown_event(qmp_ev):
+ print("Detected guest shutdown, let's go")
+ ret = 0
+ break
+
+ qmpconn.close()
+ return ret
diff --git a/src/virtual/qmp.py b/src/virtual/qmp.py
new file mode 100644
index 0000000..5c8cf6a
--- /dev/null
+++ b/src/virtual/qmp.py
@@ -0,0 +1,256 @@
+# QEMU Monitor Protocol Python class
+#
+# Copyright (C) 2009, 2010 Red Hat Inc.
+#
+# Authors:
+# Luiz Capitulino <lcapitulino@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+
+import json
+import errno
+import socket
+import logging
+
+
+class QMPError(Exception):
+ pass
+
+
+class QMPConnectError(QMPError):
+ pass
+
+
+class QMPCapabilitiesError(QMPError):
+ pass
+
+
+class QMPTimeoutError(QMPError):
+ pass
+
+
+class QEMUMonitorProtocol(object):
+
+ #: Logger object for debugging messages
+ logger = logging.getLogger('QMP')
+ #: Socket's error class
+ error = socket.error
+ #: Socket's timeout
+ timeout = socket.timeout
+
+ def __init__(self, address, server=False):
+ """
+ Create a QEMUMonitorProtocol class.
+
+ @param address: QEMU address, can be either a unix socket path (string)
+ or a tuple in the form ( address, port ) for a TCP
+ connection
+ @param server: server mode listens on the socket (bool)
+ @raise socket.error on socket connection errors
+ @note No connection is established, this is done by the connect() or
+ accept() methods
+ """
+ self.__events = []
+ self.__address = address
+ self.__sock = self.__get_sock()
+ self.__sockfile = None
+ if server:
+ self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self.__sock.bind(self.__address)
+ self.__sock.listen(1)
+
+ def __get_sock(self):
+ if isinstance(self.__address, tuple):
+ family = socket.AF_INET
+ else:
+ family = socket.AF_UNIX
+ return socket.socket(family, socket.SOCK_STREAM)
+
+ def __negotiate_capabilities(self):
+ greeting = self.__json_read()
+ if greeting is None or "QMP" not in greeting:
+ raise QMPConnectError
+ # Greeting seems ok, negotiate capabilities
+ resp = self.cmd('qmp_capabilities')
+ if "return" in resp:
+ return greeting
+ raise QMPCapabilitiesError
+
+ def __json_read(self, only_event=False):
+ while True:
+ data = self.__sockfile.readline()
+ if not data:
+ return
+ resp = json.loads(data)
+ if 'event' in resp:
+ self.logger.debug("<<< %s", resp)
+ self.__events.append(resp)
+ if not only_event:
+ continue
+ return resp
+
+ def __get_events(self, wait=False):
+ """
+ Check for new events in the stream and cache them in __events.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+ """
+
+ # Check for new events regardless and pull them into the cache:
+ self.__sock.setblocking(0)
+ try:
+ self.__json_read()
+ except socket.error as err:
+ if err[0] == errno.EAGAIN:
+ # No data available
+ pass
+ self.__sock.setblocking(1)
+
+ # Wait for new events, if needed.
+ # if wait is 0.0, this means "no wait" and is also implicitly false.
+ if not self.__events and wait:
+ if isinstance(wait, float):
+ self.__sock.settimeout(wait)
+ try:
+ ret = self.__json_read(only_event=True)
+ except socket.timeout:
+ raise QMPTimeoutError("Timeout waiting for event")
+ except:
+ raise QMPConnectError("Error while reading from socket")
+ if ret is None:
+ raise QMPConnectError("Error while reading from socket")
+ self.__sock.settimeout(None)
+
+ def connect(self, negotiate=True):
+ """
+ Connect to the QMP Monitor and perform capabilities negotiation.
+
+ @return QMP greeting dict
+ @raise socket.error on socket connection errors
+ @raise QMPConnectError if the greeting is not received
+ @raise QMPCapabilitiesError if fails to negotiate capabilities
+ """
+ self.__sock.connect(self.__address)
+ self.__sockfile = self.__sock.makefile()
+ if negotiate:
+ return self.__negotiate_capabilities()
+
+ def accept(self):
+ """
+ Await connection from QMP Monitor and perform capabilities negotiation.
+
+ @return QMP greeting dict
+ @raise socket.error on socket connection errors
+ @raise QMPConnectError if the greeting is not received
+ @raise QMPCapabilitiesError if fails to negotiate capabilities
+ """
+ self.__sock.settimeout(15)
+ self.__sock, _ = self.__sock.accept()
+ self.__sockfile = self.__sock.makefile()
+ return self.__negotiate_capabilities()
+
+ def cmd_obj(self, qmp_cmd):
+ """
+ Send a QMP command to the QMP Monitor.
+
+ @param qmp_cmd: QMP command to be sent as a Python dict
+ @return QMP response as a Python dict or None if the connection has
+ been closed
+ """
+ self.logger.debug(">>> %s", qmp_cmd)
+ try:
+ self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8'))
+ except socket.error as err:
+ if err[0] == errno.EPIPE:
+ return
+ raise socket.error(err)
+ resp = self.__json_read()
+ self.logger.debug("<<< %s", resp)
+ return resp
+
+ def cmd(self, name, args=None, cmd_id=None):
+ """
+ Build a QMP command and send it to the QMP Monitor.
+
+ @param name: command name (string)
+ @param args: command arguments (dict)
+ @param cmd_id: command id (dict, list, string or int)
+ """
+ qmp_cmd = {'execute': name}
+ if args:
+ qmp_cmd['arguments'] = args
+ if cmd_id:
+ qmp_cmd['id'] = cmd_id
+ return self.cmd_obj(qmp_cmd)
+
+ def command(self, cmd, **kwds):
+ """
+ Build and send a QMP command to the monitor, report errors if any
+ """
+ ret = self.cmd(cmd, kwds)
+ if "error" in ret:
+ raise Exception(ret['error']['desc'])
+ return ret['return']
+
+ def pull_event(self, wait=False):
+ """
+ Pulls a single event.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+
+ @return The first available QMP event, or None.
+ """
+ self.__get_events(wait)
+
+ if self.__events:
+ return self.__events.pop(0)
+ return None
+
+ def get_events(self, wait=False):
+ """
+ Get a list of available QMP events.
+
+ @param wait (bool): block until an event is available.
+ @param wait (float): If wait is a float, treat it as a timeout value.
+
+ @raise QMPTimeoutError: If a timeout float is provided and the timeout
+ period elapses.
+ @raise QMPConnectError: If wait is True but no events could be
+ retrieved or if some other error occurred.
+
+ @return The list of available QMP events.
+ """
+ self.__get_events(wait)
+ return self.__events
+
+ def clear_events(self):
+ """
+ Clear current list of pending events.
+ """
+ self.__events = []
+
+ def close(self):
+ self.__sock.close()
+ self.__sockfile.close()
+
+ def settimeout(self, timeout):
+ self.__sock.settimeout(timeout)
+
+ def get_sock_fd(self):
+ return self.__sock.fileno()
+
+ def is_scm_available(self):
+ return self.__sock.family == socket.AF_UNIX