Coverage for drivers/blktap2 : 22%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
#!/usr/bin/env python # # Copyright (C) Citrix Systems Inc. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published # by the Free Software Foundation; version 2.1 only. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public License # along with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA # # blktap2: blktap/tapdisk management layer #
# For RRDD Plugin Registration
self.sock = socket(AF_UNIX, SOCK_STREAM) self.sock.connect(SOCKPATH)
return UnixStreamHTTP(SOCKPATH) # overridden, but prevents IndexError
self.lock.acquire() try: try: ret = op(self, *args) except (util.CommandException, util.SMException, XenAPI.Failure), e: util.logException("BLKTAP2:%s" % op) msg = str(e) if isinstance(e, util.CommandException): msg = "Command %s failed (%s): %s" % \ (e.cmd, e.code, e.reason) if override: raise xs_errors.XenError(excType, opterr=msg) else: raise except: util.logException("BLKTAP2:%s" % op) raise finally: self.lock.release() return ret
attempt = 0
while True: attempt += 1
try: return f(*__t, **__d)
except self.TransientFailure, e: e = e.exception
if attempt >= self.limit: raise e
time.sleep(self.backoff)
self.exception = exception
"""Tapdisk IPC utility calls."""
self.cmd = cmd self._p = p self.stdout = p.stdout
"""TapCtl cmd failure."""
self.cmd = cmd self.info = info
items = self.info.iteritems() info = ", ".join("%s=%s" % item for item in items) return "%s failed: %s" % (self.cmd, info)
# Trying to get a non-existent attribute throws an AttributeError # exception if self.info.has_key(key): return self.info[key] return object.__getattribute__(self, key)
# Retrieves the error code returned by the command. If the error code # was not supplied at object-construction time, zero is returned. key = 'status' if self.info.has_key(key): return self.info[key] else: return 0
def __mkcmd_real(cls, args): return [ cls.PATH ] + map(str, args)
def _mkcmd(cls, args):
__next_mkcmd = cls.__next_mkcmd cls.__next_mkcmd = cls.__mkcmd_real
return __next_mkcmd(args)
""" Fail next invocation with @status. If @prev is true, execute the original command """
__prev_mkcmd = cls.__next_mkcmd
@classmethod def __mkcmd(cls, args): if prev: cmd = __prev_mkcmd(args) cmd = "'%s' && exit %d" % ("' '".join(cmd), status) else: cmd = "exit %d" % status
return [ '/bin/sh', '-c', cmd ]
cls.__next_mkcmd = __mkcmd
def strace(cls): """ Run next invocation through strace. Output goes to /tmp/tap-ctl.<sm-pid>.<n>; <n> counts invocations. """
__prev_mkcmd = cls.__next_mkcmd
@classmethod def __next_mkcmd(cls, args):
# pylint: disable = E1101
cmd = __prev_mkcmd(args)
tracefile = "/tmp/%s.%d.%d" % (os.path.basename(cls.PATH), os.getpid(), cls.__strace_n) cls.__strace_n += 1
return \ [ '/usr/bin/strace', '-o', tracefile, '--'] + cmd
cls.__next_mkcmd = __next_mkcmd
""" Spawn a tap-ctl process. Return a TapCtl invocation. Raises a TapCtl.CommandFailure if subprocess creation failed. """ cmd = cls._mkcmd(args)
if not quiet: util.SMlog(cmd) try: p = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, close_fds=True) if input: p.stdin.write(input) p.stdin.close() except OSError, e: raise cls.CommandFailure(cmd, errno=e.errno)
return cls(cmd, p)
output = map(str.rstrip, self._p.stderr) return "; ".join(output)
""" Reap the child tap-ctl process of this invocation. Raises a TapCtl.CommandFailure on non-zero exit status. """ status = self._p.wait() if not quiet: util.SMlog(" = %d" % status)
if status == 0: return
info = { 'errmsg' : self._errmsg(), 'pid' : self._p.pid }
if status < 0: info['signal'] = -status else: info['status'] = status
raise self.CommandFailure(self.cmd, **info)
""" Spawn a tap-ctl invocation and read a single line. """ tapctl = cls._call(args=args, quiet=quiet, input=input)
output = tapctl.stdout.readline().rstrip()
tapctl._wait(quiet) return output
def _maybe(opt, parm): if parm is not None: return [ opt, parm ] return []
args = [ "list" ] args += cls._maybe("-m", minor) args += cls._maybe("-p", pid) args += cls._maybe("-t", _type) args += cls._maybe("-f", path)
tapctl = cls._call(args, True)
for line in tapctl.stdout: # FIXME: tap-ctl writes error messages to stdout and # confuses this parser if line == "blktap kernel module not installed\n": # This isn't pretty but (a) neither is confusing stdout/stderr # and at least causes the error to describe the fix raise Exception, "blktap kernel module not installed: try 'modprobe blktap'" row = {}
for field in line.rstrip().split(' ', 3): bits = field.split('=') if len(bits) == 2: key, val = field.split('=')
if key in ('pid', 'minor'): row[key] = int(val, 10)
elif key in ('state'): row[key] = int(val, 0x10)
else: row[key] = val else: util.SMlog("Ignoring unexpected tap-ctl output: %s" % repr(field)) yield row
tapctl._wait(True)
def list(cls, **args):
# FIXME. We typically get an EPROTO when uevents interleave # with SM ops and a tapdisk shuts down under our feet. Should # be fixed in SM.
try: return list(cls.__list(**args))
except cls.CommandFailure, e: transient = [ errno.EPROTO, errno.ENOENT ] if e.status in transient: raise RetryLoop.TransientFailure(e) raise
args = [ "allocate" ] args += cls._maybe("-d", devpath) return cls._pread(args)
def free(cls, minor): args = [ "free", "-m", minor ] cls._pread(args)
def spawn(cls): args = [ "spawn" ] try: pid = cls._pread(args) return int(pid) except cls.CommandFailure as ce: # intermittent failures to spawn. CA-292268 if ce.status == 1: raise RetryLoop.TransientFailure(ce) raise
def attach(cls, pid, minor): args = [ "attach", "-p", pid, "-m", minor ] cls._pread(args)
def detach(cls, pid, minor): args = [ "detach", "-p", pid, "-m", minor ] cls._pread(args)
def open(cls, pid, minor, _type, _file, options): params = Tapdisk.Arg(_type, _file) args = [ "open", "-p", pid, "-m", minor, '-a', str(params) ] input = None if options.get("rdonly"): args.append('-R') if options.get("lcache"): args.append("-r") if options.get("existing_prt") != None: args.append("-e") args.append(str(options["existing_prt"])) if options.get("secondary"): args.append("-2") args.append(options["secondary"]) if options.get("standby"): args.append("-s") if options.get("timeout"): args.append("-t") args.append(str(options["timeout"])) if not options.get("o_direct", True): args.append("-D") if options.get('cbtlog'): args.extend(['-C', options['cbtlog']]) if options.get('key_hash'): import plugins
key_hash = options['key_hash'] vdi_uuid = options['vdi_uuid'] key = plugins.load_key(key_hash, vdi_uuid) if not key: raise util.SMException("No key found with key hash {}".format(key_hash)) input = key args.append('-E') cls._pread(args=args, input=input)
args = [ "close", "-p", pid, "-m", minor, "-t", "30" ] if force: args += [ "-f" ] cls._pread(args)
def pause(cls, pid, minor): args = [ "pause", "-p", pid, "-m", minor ] cls._pread(args)
cbtlog = None): args = [ "unpause", "-p", pid, "-m", minor ] if mirror: args.extend(["-2", mirror]) if _type and _file: params = Tapdisk.Arg(_type, _file) args += [ "-a", str(params) ] if cbtlog: args.extend(["-c", cbtlog]) cls._pread(args)
def stats(cls, pid, minor): args = [ "stats", "-p", pid, "-m", minor ] return cls._pread(args, quiet = True)
def major(cls): args = [ "major" ] major = cls._pread(args) return int(major)
"""Tapdisk already running."""
self.tapdisk = tapdisk
return "%s already running" % self.tapdisk
"""No such Tapdisk."""
self.attrs = attrs
items = self.attrs.iteritems() attrs = ", ".join("%s=%s" % attr for attr in items) return "No such Tapdisk(%s)" % attrs
"""More than one tapdisk on one path."""
self.tapdisks = tapdisks
tapdisks = map(str, self.tapdisks) return "Found multiple tapdisks: %s" % tapdisks
"""Tapdisk launch failure."""
self.arg = arg self.err = err
return "Tapdisk(%s): %s" % (self.arg, self.err)
return self.err
"""Tapdisk pause/unpause failure"""
self.tapdisk = tapdisk
return str(self.tapdisk)
if not os.path.exists(path): parent, subdir = os.path.split(path) assert parent != path try: if parent: mkdirs(parent, mode) if subdir: os.mkdir(path, mode) except OSError, e: if e.errno != errno.EEXIST: raise
raise NotImplementedError("sysfs_devname is undefined")
self.path = path
def from_kobject(cls, kobj): path = "%s/%s" % (kobj.sysfs_path(), cls.SYSFS_NODENAME) return cls(path)
self.name = name
return "No such attribute: %s" % self.name
try: return file(self.path, mode) except IOError, e: if e.errno == errno.ENOENT: raise self.NoSuchAttribute(self) raise
f = self._open('r') s = f.readline().rstrip() f.close() return s
f = self._open('w') f.write(val) f.close()
def sysfs_class_path(cls): return "/sys/class/%s" % cls.SYSFS_CLASSTYPE
return "%s/%s" % (self.sysfs_class_path(), self.sysfs_devname())
self.minor = minor self._pool = None self._task = None
def allocate(cls): # FIXME. Should rather go into init. mkdirs(cls.DEV_BASEDIR)
devname = TapCtl.allocate() minor = Tapdisk._parse_minor(devname) return cls(minor)
TapCtl.free(self.minor)
return "%s(minor=%d)" % (self.__class__.__name__, self.minor)
return "blktap!blktap%d" % self.minor
if not self._pool: self._pool = self.Pool.from_kobject(self) return self._pool
return self.get_pool_attr().readline()
self.get_pool_attr().writeline(name)
self.get_pool().set_size(pages)
return BlktapControl.get_pool(self.get_pool_name())
self.set_pool_name(pool.name)
if not self._task: self._task = self.Task.from_kobject(self) return self._task
pid = self.get_task_attr().readline() try: return int(pid) except ValueError: return None
pid = self.get_task_pid() if pid is None: return None
return Tapdisk.find(pid=pid, minor=self.minor)
tapdisk = self.find_tapdisk() if not tapdisk: raise TapdiskNotRunning(minor=self.minor) return tapdisk
state = self.pause_state() return "Tapdisk(%s, pid=%d, minor=%s, state=%s)" % \ (self.get_arg(), self.pid, self.minor, state)
def list(cls, **args):
for row in TapCtl.list(**args):
args = { 'pid' : None, 'minor' : None, 'state' : None, '_type' : None, 'path' : None }
for key, val in row.iteritems(): if key in args: args[key] = val
if 'args' in row: image = Tapdisk.Arg.parse(row['args']) args['_type'] = image.type args['path'] = image.path
if None in args.values(): continue
yield Tapdisk(**args)
def find(cls, **args):
found = list(cls.list(**args))
if len(found) > 1: raise TapdiskNotUnique(found)
if found: return found[0]
return None
def find_by_path(cls, path): return cls.find(path=path)
def find_by_minor(cls, minor): return cls.find(minor=minor)
def get(cls, **attrs):
tapdisk = cls.find(**attrs)
if not tapdisk: raise TapdiskNotRunning(**attrs)
return tapdisk
def from_path(cls, path): return cls.get(path=path)
def from_minor(cls, minor): return cls.get(minor=minor)
def __from_blktap(cls, blktap): tapdisk = cls.from_minor(minor=blktap.minor) tapdisk._blktap = blktap return tapdisk
if not self._blktap: self._blktap = Blktap(self.minor) return self._blktap
self.type = _type self.path = path
return "%s:%s" % (self.type, self.path)
def parse(cls, arg):
try: _type, path = arg.split(":", 1) except ValueError: raise cls.InvalidArgument(arg)
if _type not in Tapdisk.TYPES: raise cls.InvalidType(_type)
return cls(_type, path)
self.type = _type
return "Not a Tapdisk type: %s" % self.type
self.arg = arg
return "Not a Tapdisk image: %s" % self.arg
return self.Arg(self.type, self.path)
return "%s/tapdev%d" % (Blktap.DEV_BASEDIR, self.minor)
def launch_from_arg(cls, arg): arg = cls.Arg.parse(arg) return cls.launch(arg.path, arg.type, False)
def launch_on_tap(cls, blktap, path, _type, options):
tapdisk = cls.find_by_path(path) if tapdisk: raise TapdiskExists(tapdisk)
minor = blktap.minor
try: pid = TapCtl.spawn()
try: TapCtl.attach(pid, minor)
try: TapCtl.open(pid, minor, _type, path, options) try: tapdisk = cls.__from_blktap(blktap) node = '/sys/dev/block/%d:%d' % (tapdisk.major(), tapdisk.minor) util.set_scheduler_sysfs_node(node, 'noop') return tapdisk except: TapCtl.close(pid, minor) raise
except: TapCtl.detach(pid, minor) raise
except: exc_info = sys.exc_info() # FIXME: Should be tap-ctl shutdown. try: import signal os.kill(pid, signal.SIGTERM) os.waitpid(pid, 0) finally: raise exc_info[0], exc_info[1], exc_info[2]
except TapCtl.CommandFailure, ctl: util.logException(ctl) if ('/dev/xapi/cd/' in path and 'status' in ctl.info and ctl.info['status'] == 123): # ENOMEDIUM (No medium found) raise xs_errors.XenError('TapdiskDriveEmpty') else: raise TapdiskFailed(cls.Arg(_type, path), ctl)
def launch(cls, path, _type, rdonly): blktap = Blktap.allocate() try: return cls.launch_on_tap(blktap, path, _type, {"rdonly": rdonly}) except: blktap.free() raise
TapCtl.close(self.pid, self.minor, force)
TapCtl.detach(self.pid, self.minor)
self.get_blktap().free()
if not self.is_running(): raise TapdiskInvalidState(self)
TapCtl.pause(self.pid, self.minor)
self._set_dirty()
if not self.is_paused(): raise TapdiskInvalidState(self)
# FIXME: should the arguments be optional? if _type is None: _type = self.type if path is None: path = self.path
TapCtl.unpause(self.pid, self.minor, _type, path, mirror=mirror, cbtlog=cbtlog)
self._set_dirty()
return json.loads(TapCtl.stats(self.pid, self.minor))
# # NB. dirty/refresh: reload attributes on next access #
self._dirty = True
t = self.from_minor(__get('minor')) self.__init__(t.pid, t.minor, t.type, t.path, t.state)
# NB. avoid(rec(ursion)
name in ['minor', 'type', 'path', 'state']: self._refresh(__get) self._dirty = False
return not not (self.state & self.Flags.PAUSED)
return not (self.state & self.Flags.PAUSE_MASK)
if self.state & self.Flags.PAUSED: return self.PauseState.PAUSED
if self.state & self.Flags.PAUSE_REQUESTED: return self.PauseState.PAUSING
return self.PauseState.RUNNING
def _parse_minor(devpath):
regex = '%s/(blktap|tapdev)(\d+)$' % Blktap.DEV_BASEDIR pattern = re.compile(regex) groups = pattern.search(devpath) if not groups: raise Exception, \ "malformed tap device: '%s' (%s) " % (devpath, regex)
minor = groups.group(2) return int(minor)
def major(cls): if cls._major: return cls._major
devices = file("/proc/devices") for line in devices:
row = line.rstrip().split(' ') if len(row) != 2: continue
major, name = row if name != 'tapdev': continue
cls._major = int(major) break
devices.close() return cls._major
"""SR.vdi driver decorator for blktap2"""
# number of seconds on top of NFS timeo mount option the tapdisk should # wait before reporting errors. This is to allow a retry to succeed in case # packets were lost the first time around, which prevented the NFS client # from returning before the timeo is reached even if the NFS server did # come back earlier
"""Returns True/False based on licensing and caching_params""" if self.__o_direct is not None: return self.__o_direct, self.__o_direct_reason
if util.read_caching_is_restricted(self._session): self.__o_direct = True self.__o_direct_reason = "LICENSE_RESTRICTION" elif not ((self.target.vdi.sr.handles("nfs") or self.target.vdi.sr.handles("ext") or self.target.vdi.sr.handles("smb"))): self.__o_direct = True self.__o_direct_reason = "SR_NOT_SUPPORTED" elif not (options.get("rdonly") or self.target.vdi.parent): util.SMlog(self.target.vdi) self.__o_direct = True self.__o_direct_reason = "NO_RO_IMAGE" elif options.get("rdonly") and not self.target.vdi.parent: self.__o_direct = True self.__o_direct_reason = "RO_WITH_NO_PARENT" elif options.get(self.CONF_KEY_O_DIRECT): self.__o_direct = True self.__o_direct_reason = "SR_OVERRIDE"
if self.__o_direct is None: self.__o_direct = False self.__o_direct_reason = ""
return self.__o_direct, self.__o_direct_reason
def from_cli(cls, uuid): import VDI as sm import XenAPI
session = XenAPI.xapi_local() session.xenapi.login_with_password('root', '', '', 'SM')
target = sm.VDI.from_uuid(session, uuid) driver_info = target.sr.srcmd.driver_info
session.xenapi.session.logout()
return cls(uuid, target, driver_info)
def _tap_type(vdi_type): """Map a VDI type (e.g. 'raw') to a tapdisk driver type (e.g. 'aio')""" 'raw' : 'aio', 'vhd' : 'vhd', 'iso' : 'aio', # for ISO SR 'aio' : 'aio', # for LVHD 'file' : 'aio', 'phy' : 'aio' } [vdi_type]
return self.target.get_vdi_path()
self.vdi_type = vdi_type self.target = target
return \ "Target %s has unexpected VDI type '%s'" % \ (type(self.target), self.vdi_type)
'raw' : 'phy', 'aio' : 'tap', # for LVHD raw nodes 'iso' : 'tap', # for ISOSR 'file' : 'tap', 'vhd' : 'tap' }
# 1. Let the target vdi_type decide
except KeyError: raise self.UnexpectedVDIType(vdi_type, self.target.vdi)
return True
# 2. Otherwise, there may be more reasons # # .. TBD
return False
"""Safe target driver access."""
# NB. *Must* test caps for optional calls. Some targets # actually implement some slots, but do not enable them. Just # try/except would risk breaking compatibility.
self.vdi = vdi self._caps = driver_info['capabilities']
"""Determine if target has given capability""" return cap in self._caps
#assert self.has_cap("VDI_ATTACH") return self.vdi.attach(sr_uuid, vdi_uuid)
#assert self.has_cap("VDI_DETACH") self.vdi.detach(sr_uuid, vdi_uuid)
if self.has_cap("VDI_ACTIVATE"): return self.vdi.activate(sr_uuid, vdi_uuid)
if self.has_cap("VDI_DEACTIVATE"): self.vdi.deactivate(sr_uuid, vdi_uuid)
#def resize(self, sr_uuid, vdi_uuid, size): # return self.vdi.resize(sr_uuid, vdi_uuid, size)
_type = self.vdi.vdi_type if not _type: _type = self.vdi.sr.sr_vditype if not _type: raise VDI.UnexpectedVDIType(_type, self.vdi) return _type
return self.vdi.path
"""Relink a node under a common name"""
# NB. We have to provide the device node path during # VDI.attach, but currently do not allocate the tapdisk minor # before VDI.activate. Therefore those link steps where we # relink existing devices under deterministic path names.
raise NotImplementedError("_mklink is not defined")
raise NotImplementedError("_equals is not defined")
self._path = path
def from_name(cls, name): path = "%s/%s" % (cls.BASEDIR, name) return cls(path)
def from_uuid(cls, sr_uuid, vdi_uuid): name = "%s/%s" % (sr_uuid, vdi_uuid) return cls.from_name(name)
return self._path
return os.stat(self.path())
path = self.path() util.SMlog("%s -> %s" % (self, target))
mkdirs(os.path.dirname(path)) try: self._mklink(target) except OSError, e: # We do unlink during teardown, but have to stay # idempotent. However, a *wrong* target should never # be seen. if e.errno != errno.EEXIST: raise assert self._equals(target), "'%s' not equal to '%s'" % (path, target)
try: os.unlink(self.path()) except OSError, e: if e.errno != errno.ENOENT: raise
path = self.path() return "%s(%s)" % (self.__class__.__name__, path)
"""Symlink some file to a common name"""
return os.readlink(self.path())
return self.path()
os.symlink(target, self.path())
return self.readlink() == target
"""Relink a block device node to a common name"""
def _real_stat(cls, target): """stat() not on @target, but its realpath()""" _target = os.path.realpath(target) return os.stat(_target)
def is_block(cls, target): """Whether @target refers to a block device.""" return S_ISBLK(cls._real_stat(target).st_mode)
st = self._real_stat(target) if not S_ISBLK(st.st_mode): raise self.NotABlockDevice(target, st)
os.mknod(self.path(), st.st_mode, st.st_rdev)
target_rdev = self._real_stat(target).st_rdev return self.stat().st_rdev == target_rdev
st = self.stat() assert S_ISBLK(st.st_mode) return os.major(st.st_rdev), os.minor(st.st_rdev)
self.path = path self.st = st
return "%s is not a block device: %s" % (self.path, self.st)
VDI.Link.__init__(self, path) self._devnode = VDI.DeviceNode(path) self._symlink = VDI.SymLink(path)
st = self.stat() if S_ISBLK(st.st_mode): return self._devnode.rdev() raise self._devnode.NotABlockDevice(self.path(), st)
if self._devnode.is_block(target): self._obj = self._devnode else: self._obj = self._symlink self._obj.mklink(target)
return self._obj._equals(target)
# NB. Cannot use DeviceNodes, e.g. FileVDIs aren't bdevs.
# NB. Could be SymLinks as well, but saving major,minor pairs in # Links enables neat state capturing when managing Tapdisks. Note # that we essentially have a tap-ctl list replacement here. For # now make it a 'Hybrid'. Likely to collapse into a DeviceNode as # soon as ISOs are tapdisks.
tapdisk = Tapdisk.find_by_path(phy_path) if not tapdisk: blktap = Blktap.allocate() blktap.set_pool_name(sr_uuid) if pool_size: blktap.set_pool_size(pool_size)
try: tapdisk = \ Tapdisk.launch_on_tap(blktap, phy_path, VDI._tap_type(vdi_type), options) except: blktap.free() raise util.SMlog("tap.activate: Launched %s" % tapdisk)
else: util.SMlog("tap.activate: Found %s" % tapdisk)
return tapdisk.get_devpath(), tapdisk
def _tap_deactivate(minor):
try: tapdisk = Tapdisk.from_minor(minor) except TapdiskNotRunning, e: util.SMlog("tap.deactivate: Warning, %s" % e) # NB. Should not be here unless the agent refcount # broke. Also, a clean shutdown should not have leaked # the recorded minor. else: tapdisk.shutdown() util.SMlog("tap.deactivate: Shut down %s" % tapdisk)
""" Pauses the tapdisk.
session: a XAPI session sr_uuid: the UUID of the SR on which VDI lives vdi_uuid: the UUID of the VDI to pause failfast: controls whether the VDI lock should be acquired in a non-blocking manner """ host_ref = key[len('host_'):] util.SMlog("Calling tap-pause on host %s" % host_ref) if not cls.call_pluginhandler(session, host_ref, sr_uuid, vdi_uuid, "pause", failfast=failfast): # Failed to pause node session.xenapi.VDI.remove_from_sm_config(vdi_ref, 'paused') return False
activate_parents = False): host_ref = key[len('host_'):] util.SMlog("Calling tap-unpause on host %s" % host_ref) if not cls.call_pluginhandler(session, host_ref, sr_uuid, vdi_uuid, "unpause", secondary, activate_parents): # Failed to unpause node return False
util.SMlog("Refresh request for %s" % vdi_uuid) vdi_ref = session.xenapi.VDI.get_by_uuid(vdi_uuid) sm_config = session.xenapi.VDI.get_sm_config(vdi_ref) for key in filter(lambda x: x.startswith('host_'), sm_config.keys()): host_ref = key[len('host_'):] util.SMlog("Calling tap-refresh on host %s" % host_ref) if not cls.call_pluginhandler(session, host_ref, sr_uuid, vdi_uuid, "refresh", None, activate_parents=activate_parents): # Failed to refresh node return False return True
def tap_status(cls, session, vdi_uuid): """Return True if disk is attached, false if it isn't""" return True
secondary = None, activate_parents = False, failfast=False): """Optionally, activate the parent LV before unpausing""" try: args = {"sr_uuid":sr_uuid, "vdi_uuid":vdi_uuid, "failfast": str(failfast)} if secondary: args["secondary"] = secondary if activate_parents: args["activate_parents"] = "true" ret = session.xenapi.host.call_plugin( host_ref, PLUGIN_TAP_PAUSE, action, args) return ret == "True" except Exception, e: util.logException("BLKTAP2:call_pluginhandler %s" % e) return False
util.SMlog("Adding tag to: %s" % vdi_uuid) attach_mode = "RO" if writable: attach_mode = "RW" vdi_ref = self._session.xenapi.VDI.get_by_uuid(vdi_uuid) host_ref = self._session.xenapi.host.get_by_uuid(util.get_this_host()) sm_config = self._session.xenapi.VDI.get_sm_config(vdi_ref) attached_as = util.attached_as(sm_config) if NO_MULTIPLE_ATTACH and (attached_as == "RW" or \ (attached_as == "RO" and attach_mode == "RW")): util.SMlog("need to reset VDI %s" % vdi_uuid) if not resetvdis.reset_vdi(self._session, vdi_uuid, force=False, term_output=False, writable=writable): raise util.SMException("VDI %s not detached cleanly" % vdi_uuid) sm_config = self._session.xenapi.VDI.get_sm_config(vdi_ref) if sm_config.has_key('paused'): util.SMlog("Paused or host_ref key found [%s]" % sm_config) return False host_key = "host_%s" % host_ref assert not sm_config.has_key(host_key) self._session.xenapi.VDI.add_to_sm_config(vdi_ref, host_key, attach_mode) sm_config = self._session.xenapi.VDI.get_sm_config(vdi_ref) if sm_config.has_key('paused'): util.SMlog("Found paused key, aborting") self._session.xenapi.VDI.remove_from_sm_config(vdi_ref, host_key) return False util.SMlog("Activate lock succeeded") return True
vdi_ref = self._session.xenapi.VDI.get_by_uuid(vdi_uuid) sm_config = self._session.xenapi.VDI.get_sm_config(vdi_ref) if sm_config.has_key('paused'): util.SMlog("Paused key found [%s]" % sm_config) return False return True
vdi_ref = self._session.xenapi.VDI.get_by_uuid(vdi_uuid) host_ref = self._session.xenapi.host.get_by_uuid(util.get_this_host()) sm_config = self._session.xenapi.VDI.get_sm_config(vdi_ref) host_key = "host_%s" % host_ref if sm_config.has_key(host_key): self._session.xenapi.VDI.remove_from_sm_config(vdi_ref, host_key) util.SMlog("Removed host key %s for %s" % (host_key, vdi_uuid)) else: util.SMlog("_remove_tag: host key %s not found, ignore" % host_key)
pool_info = dict() vdi_ref = self.target.vdi.sr.srcmd.params.get('vdi_ref') if not vdi_ref: # attach_from_config context: HA disks don't need to be in any # special pool return pool_info session = XenAPI.xapi_local() session.xenapi.login_with_password('root', '', '', 'SM') sr_ref = self.target.vdi.sr.srcmd.params.get('sr_ref') sr_config = session.xenapi.SR.get_other_config(sr_ref) vdi_config = session.xenapi.VDI.get_other_config(vdi_ref) pool_size_str = sr_config.get(POOL_SIZE_KEY) pool_name_override = vdi_config.get(POOL_NAME_KEY) if pool_name_override: pool_name = pool_name_override pool_size_override = vdi_config.get(POOL_SIZE_KEY) if pool_size_override: pool_size_str = pool_size_override pool_size = 0 if pool_size_str: try: pool_size = int(pool_size_str) if pool_size < 1 or pool_size > MAX_FULL_RINGS: raise ValueError("outside of range") pool_size = NUM_PAGES_PER_RING * pool_size except ValueError: util.SMlog("Error: invalid mem-pool-size %s" % pool_size_str) pool_size = 0
pool_info["mem-pool"] = pool_name if pool_size: pool_info["mem-pool-size"] = str(pool_size)
session.xenapi.session.logout() return pool_info
int(self.tap.minor))
"""Return/dev/sm/backend symlink path""" self.xenstore_data.update(self._get_pool_config(sr_uuid)) if not self.target.has_cap("ATOMIC_PAUSE") or activate: util.SMlog("Attach & activate") self._attach(sr_uuid, vdi_uuid) dev_path = self._activate(sr_uuid, vdi_uuid, {"rdonly": not writable}) self.BackendLink.from_uuid(sr_uuid, vdi_uuid).mklink(dev_path) self.linkNBD(sr_uuid, vdi_uuid)
# Return backend/ link back_path = self.BackendLink.from_uuid(sr_uuid, vdi_uuid).path() if self.tap_wanted(): # Only have NBD if we also have a tap nbd_path =\ "nbd:unix:" + VDI.NBDLink.from_uuid(sr_uuid, vdi_uuid).path() else: nbd_path = ""
options = {"rdonly": not writable} options.update(caching_params) o_direct, o_direct_reason = self.get_o_direct_capability(options) struct = {'params': back_path, 'params_nbd': nbd_path, 'o_direct': o_direct, 'o_direct_reason': o_direct_reason, 'xenstore_data': self.xenstore_data} util.SMlog('result: %s' % struct)
try: f=open("%s.attach_info" % back_path, 'a') f.write(xmlrpclib.dumps((struct,), "", True)) f.close() except: pass
return xmlrpclib.dumps((struct,), "", True)
util.SMlog("blktap2.activate") options = {"rdonly": not writable} options.update(caching_params)
sr_ref = self.target.vdi.sr.srcmd.params.get('sr_ref') sr_other_config = self._session.xenapi.SR.get_other_config(sr_ref) timeout = nfs.get_nfs_timeout(sr_other_config) if timeout: # Note NFS timeout values are in deciseconds timeout = int((timeout+5) / 10) options["timeout"] = timeout + self.TAPDISK_TIMEOUT_MARGIN for i in range(self.ATTACH_DETACH_RETRY_SECS): try: if self._activate_locked(sr_uuid, vdi_uuid, options): return except util.SRBusyException: util.SMlog("SR locked, retrying") time.sleep(1) raise util.SMException("VDI %s locked" % vdi_uuid)
def _activate_locked(self, sr_uuid, vdi_uuid, options): """Wraps target.activate and adds a tapdisk""" import VDI as sm
#util.SMlog("VDI.activate %s" % vdi_uuid) if self.tap_wanted(): if not self._add_tag(vdi_uuid, not options["rdonly"]): return False # it is possible that while the VDI was paused some of its # attributes have changed (e.g. its size if it was inflated; or its # path if it was leaf-coalesced onto a raw LV), so refresh the # object completely params = self.target.vdi.sr.srcmd.params target = sm.VDI.from_uuid(self.target.vdi.session, vdi_uuid) target.sr.srcmd.params = params driver_info = target.sr.srcmd.driver_info self.target = self.TargetDriver(target, driver_info)
try: util.fistpoint.activate_custom_fn( "blktap_activate_inject_failure", lambda: util.inject_failure())
# Attach the physical node if self.target.has_cap("ATOMIC_PAUSE"): self._attach(sr_uuid, vdi_uuid)
vdi_type = self.target.get_vdi_type()
# Take lvchange-p Lock before running # tap-ctl open # Needed to avoid race with lvchange -p which is # now taking the same lock # This is a fix for CA-155766 if hasattr(self.target.vdi.sr, 'DRIVER_TYPE') and \ self.target.vdi.sr.DRIVER_TYPE == 'lvhd' and \ vdi_type == vhdutil.VDI_TYPE_VHD: lock = Lock("lvchange-p", lvhdutil.NS_PREFIX_LVM + sr_uuid) lock.acquire()
# When we attach a static VDI for HA, we cannot communicate with # xapi, because has not started yet. These VDIs are raw. if vdi_type != vhdutil.VDI_TYPE_RAW: session = self.target.vdi.session vdi_ref = session.xenapi.VDI.get_by_uuid(vdi_uuid) sm_config = session.xenapi.VDI.get_sm_config(vdi_ref) if 'key_hash' in sm_config: key_hash = sm_config['key_hash'] options['key_hash'] = key_hash options['vdi_uuid'] = vdi_uuid util.SMlog('Using key with hash {} for VDI {}'.format(key_hash, vdi_uuid)) # Activate the physical node dev_path = self._activate(sr_uuid, vdi_uuid, options)
if hasattr(self.target.vdi.sr, 'DRIVER_TYPE') and \ self.target.vdi.sr.DRIVER_TYPE == 'lvhd' and \ self.target.get_vdi_type() == vhdutil.VDI_TYPE_VHD: lock.release() except: util.SMlog("Exception in activate/attach") if self.tap_wanted(): util.fistpoint.activate_custom_fn( "blktap_activate_error_handling", lambda: time.sleep(30)) while True: try: self._remove_tag(vdi_uuid) break except xmlrpclib.ProtocolError, e: # If there's a connection error, keep trying forever. if e.errcode == httplib.INTERNAL_SERVER_ERROR: continue else: util.SMlog('failed to remove tag: %s' % e) break except Exception, e: util.SMlog('failed to remove tag: %s' % e) break raise
# Link result to backend/ self.BackendLink.from_uuid(sr_uuid, vdi_uuid).mklink(dev_path) self.linkNBD(sr_uuid, vdi_uuid) return True
vdi_options = self.target.activate(sr_uuid, vdi_uuid)
dev_path = self.setup_cache(sr_uuid, vdi_uuid, options) if not dev_path: phy_path = self.PhyLink.from_uuid(sr_uuid, vdi_uuid).readlink() # Maybe launch a tapdisk on the physical link if self.tap_wanted(): vdi_type = self.target.get_vdi_type() options["o_direct"] = self.get_o_direct_capability(options)[0] if vdi_options: options.update(vdi_options) dev_path, self.tap = self._tap_activate(phy_path, vdi_type, sr_uuid, options, self._get_pool_config(sr_uuid).get("mem-pool-size")) else: dev_path = phy_path # Just reuse phy
return dev_path
attach_info = xmlrpclib.loads(self.target.attach(sr_uuid, vdi_uuid))[0][0] params = attach_info['params'] xenstore_data = attach_info['xenstore_data'] phy_path = util.to_plain_string(params) self.xenstore_data.update(xenstore_data) # Save it to phy/ self.PhyLink.from_uuid(sr_uuid, vdi_uuid).mklink(phy_path)
util.SMlog("blktap2.deactivate") for i in range(self.ATTACH_DETACH_RETRY_SECS): try: if self._deactivate_locked(sr_uuid, vdi_uuid, caching_params): return except util.SRBusyException, e: util.SMlog("SR locked, retrying") time.sleep(1) raise util.SMException("VDI %s locked" % vdi_uuid)
def _deactivate_locked(self, sr_uuid, vdi_uuid, caching_params): """Wraps target.deactivate and removes a tapdisk"""
#util.SMlog("VDI.deactivate %s" % vdi_uuid) if self.tap_wanted() and not self._check_tag(vdi_uuid): return False
self._deactivate(sr_uuid, vdi_uuid, caching_params) if self.target.has_cap("ATOMIC_PAUSE"): self._detach(sr_uuid, vdi_uuid) if self.tap_wanted(): self._remove_tag(vdi_uuid)
return True
self.PhyLink.from_uuid(sr_uuid, vdi_uuid).mklink(path)
if not self.target.has_cap("ATOMIC_PAUSE") or deactivate: util.SMlog("Deactivate & detach") self._deactivate(sr_uuid, vdi_uuid, caching_params) self._detach(sr_uuid, vdi_uuid) else: pass # nothing to do
import VDI as sm
# Shutdown tapdisk back_link = self.BackendLink.from_uuid(sr_uuid, vdi_uuid)
if not util.pathexists(back_link.path()): util.SMlog("Backend path %s does not exist" % back_link.path()) return
try: attach_info_path = "%s.attach_info" % (back_link.path()) os.unlink(attach_info_path) except: util.SMlog("unlink of attach_info failed")
try: major, minor = back_link.rdev() except self.DeviceNode.NotABlockDevice: pass else: if major == Tapdisk.major(): self._tap_deactivate(minor) self.remove_cache(sr_uuid, vdi_uuid, caching_params)
# Remove the backend link back_link.unlink() VDI.NBDLink.from_uuid(sr_uuid, vdi_uuid).unlink()
# Deactivate & detach the physical node if self.tap_wanted() and self.target.vdi.session is not None: # it is possible that while the VDI was paused some of its # attributes have changed (e.g. its size if it was inflated; or its # path if it was leaf-coalesced onto a raw LV), so refresh the # object completely target = sm.VDI.from_uuid(self.target.vdi.session, vdi_uuid) driver_info = target.sr.srcmd.driver_info self.target = self.TargetDriver(target, driver_info)
self.target.deactivate(sr_uuid, vdi_uuid)
self.target.detach(sr_uuid, vdi_uuid)
# Remove phy/ self.PhyLink.from_uuid(sr_uuid, vdi_uuid).unlink()
# Remove existing VDI.sm_config fields vdi_ref = session.xenapi.VDI.get_by_uuid(vdi_uuid) for key in ["on_boot", "caching"]: session.xenapi.VDI.remove_from_sm_config(vdi_ref,key) if not on_boot is None: session.xenapi.VDI.add_to_sm_config(vdi_ref,'on_boot',on_boot) if not caching is None: session.xenapi.VDI.add_to_sm_config(vdi_ref,'caching',caching)
if params.get(self.CONF_KEY_ALLOW_CACHING) != "true": return
util.SMlog("Requested local caching") if not self.target.has_cap("SR_CACHING"): util.SMlog("Error: local caching not supported by this SR") return
scratch_mode = False if params.get(self.CONF_KEY_MODE_ON_BOOT) == "reset": scratch_mode = True util.SMlog("Requested scratch mode") if not self.target.has_cap("VDI_RESET_ON_BOOT/2"): util.SMlog("Error: scratch mode not supported by this SR") return
dev_path = None local_sr_uuid = params.get(self.CONF_KEY_CACHE_SR) if not local_sr_uuid: util.SMlog("ERROR: Local cache SR not specified, not enabling") return dev_path = self._setup_cache(self._session, sr_uuid, vdi_uuid, local_sr_uuid, scratch_mode, params)
if dev_path: self._updateCacheRecord(self._session, self.target.vdi.uuid, params.get(self.CONF_KEY_MODE_ON_BOOT), params.get(self.CONF_KEY_ALLOW_CACHING))
return dev_path
vm_uuid = None vm_label = "" try: cache_sr_ref = session.xenapi.SR.get_by_uuid(cache_sr_uuid) cache_sr_rec = session.xenapi.SR.get_record(cache_sr_ref) cache_sr_label = cache_sr_rec.get("name_label")
host_ref = session.xenapi.host.get_by_uuid(util.get_this_host()) host_rec = session.xenapi.host.get_record(host_ref) host_label = host_rec.get("name_label")
vdi_ref = session.xenapi.VDI.get_by_uuid(vdi_uuid) vbds = session.xenapi.VBD.get_all_records_where( \ "field \"VDI\" = \"%s\"" % vdi_ref) for vbd_rec in vbds.values(): vm_ref = vbd_rec.get("VM") vm_rec = session.xenapi.VM.get_record(vm_ref) vm_uuid = vm_rec.get("uuid") vm_label = vm_rec.get("name_label") except: util.logException("alert_no_cache")
alert_obj = "SR" alert_uuid = str(cache_sr_uuid) alert_str = "No space left in Local Cache SR %s" % cache_sr_uuid if vm_uuid: alert_obj = "VM" alert_uuid = vm_uuid reason = "" if err == errno.ENOSPC: reason = "because there is no space left" alert_str = "The VM \"%s\" is not using IntelliCache %s on the Local Cache SR (\"%s\") on host \"%s\"" % \ (vm_label, reason, cache_sr_label, host_label)
util.SMlog("Creating alert: (%s, %s, \"%s\")" % \ (alert_obj, alert_uuid, alert_str)) session.xenapi.message.create("No space left in local cache", "3", alert_obj, alert_uuid, alert_str)
scratch_mode, options): import SR import EXTSR import NFSSR import XenAPI from lock import Lock from FileSR import FileVDI
parent_uuid = vhdutil.getParent(self.target.vdi.path, FileVDI.extractUuid) if not parent_uuid: util.SMlog("ERROR: VDI %s has no parent, not enabling" % \ self.target.vdi.uuid) return
util.SMlog("Setting up cache") parent_uuid = parent_uuid.strip() shared_target = NFSSR.NFSFileVDI(self.target.vdi.sr, parent_uuid)
if shared_target.parent: util.SMlog("ERROR: Parent VDI %s has parent, not enabling" % shared_target.uuid) return
SR.registerSR(EXTSR.EXTSR) local_sr = SR.SR.from_uuid(session, local_sr_uuid)
lock = Lock(self.LOCK_CACHE_SETUP, parent_uuid) lock.acquire()
# read cache read_cache_path = "%s/%s.vhdcache" % (local_sr.path, shared_target.uuid) if util.pathexists(read_cache_path): util.SMlog("Read cache node (%s) already exists, not creating" % \ read_cache_path) else: try: vhdutil.snapshot(read_cache_path, shared_target.path, False) except util.CommandException, e: util.SMlog("Error creating parent cache: %s" % e) self.alert_no_cache(session, vdi_uuid, local_sr_uuid, e.code) return None
# local write node leaf_size = vhdutil.getSizeVirt(self.target.vdi.path) local_leaf_path = "%s/%s.vhdcache" % \ (local_sr.path, self.target.vdi.uuid) if util.pathexists(local_leaf_path): util.SMlog("Local leaf node (%s) already exists, deleting" % \ local_leaf_path) os.unlink(local_leaf_path) try: vhdutil.snapshot(local_leaf_path, read_cache_path, False, msize = leaf_size / 1024 / 1024, checkEmpty = False) except util.CommandException, e: util.SMlog("Error creating leaf cache: %s" % e) self.alert_no_cache(session, vdi_uuid, local_sr_uuid, e.code) return None
local_leaf_size = vhdutil.getSizeVirt(local_leaf_path) if leaf_size > local_leaf_size: util.SMlog("Leaf size %d > local leaf cache size %d, resizing" % (leaf_size, local_leaf_size)) vhdutil.setSizeVirtFast(local_leaf_path, leaf_size)
vdi_type = self.target.get_vdi_type()
prt_tapdisk = Tapdisk.find_by_path(read_cache_path) if not prt_tapdisk: parent_options = copy.deepcopy(options) parent_options["rdonly"] = False parent_options["lcache"] = True
blktap = Blktap.allocate() try: blktap.set_pool_name("lcache-parent-pool-%s" % blktap.minor) # no need to change pool_size since each parent tapdisk is in # its own pool prt_tapdisk = \ Tapdisk.launch_on_tap(blktap, read_cache_path, 'vhd', parent_options) except: blktap.free() raise
secondary = "%s:%s" % (self.target.get_vdi_type(), self.PhyLink.from_uuid(sr_uuid, vdi_uuid).readlink())
util.SMlog("Parent tapdisk: %s" % prt_tapdisk) leaf_tapdisk = Tapdisk.find_by_path(local_leaf_path) if not leaf_tapdisk: blktap = Blktap.allocate() child_options = copy.deepcopy(options) child_options["rdonly"] = False child_options["lcache"] = False child_options["existing_prt"] = prt_tapdisk.minor child_options["secondary"] = secondary child_options["standby"] = scratch_mode try: leaf_tapdisk = \ Tapdisk.launch_on_tap(blktap, local_leaf_path, 'vhd', child_options) except: blktap.free() raise
lock.release()
util.SMlog("Local read cache: %s, local leaf: %s" % \ (read_cache_path, local_leaf_path))
self.tap = leaf_tapdisk return leaf_tapdisk.get_devpath()
if not self.target.has_cap("SR_CACHING"): return
caching = params.get(self.CONF_KEY_ALLOW_CACHING) == "true"
local_sr_uuid = params.get(self.CONF_KEY_CACHE_SR) if caching and not local_sr_uuid: util.SMlog("ERROR: Local cache SR not specified, ignore") return
if caching: self._remove_cache(self._session, local_sr_uuid)
if self._session is not None: self._updateCacheRecord(self._session, self.target.vdi.uuid, None, None)
(retVal, links) = util.findRunningProcessOrOpenFile("tapdisk") if not retVal: # err on the side of caution return True
for link in links: if link.find("tapdev%d" % minor) != -1: return True return False
import SR import EXTSR import NFSSR import XenAPI from lock import Lock from FileSR import FileVDI
parent_uuid = vhdutil.getParent(self.target.vdi.path, FileVDI.extractUuid) if not parent_uuid: util.SMlog("ERROR: No parent for VDI %s, ignore" % \ self.target.vdi.uuid) return
util.SMlog("Tearing down the cache")
parent_uuid = parent_uuid.strip() shared_target = NFSSR.NFSFileVDI(self.target.vdi.sr, parent_uuid)
SR.registerSR(EXTSR.EXTSR) local_sr = SR.SR.from_uuid(session, local_sr_uuid)
lock = Lock(self.LOCK_CACHE_SETUP, parent_uuid) lock.acquire()
# local write node local_leaf_path = "%s/%s.vhdcache" % \ (local_sr.path, self.target.vdi.uuid) if util.pathexists(local_leaf_path): util.SMlog("Deleting local leaf node %s" % local_leaf_path) os.unlink(local_leaf_path)
read_cache_path = "%s/%s.vhdcache" % (local_sr.path, shared_target.uuid) prt_tapdisk = Tapdisk.find_by_path(read_cache_path) if not prt_tapdisk: util.SMlog("Parent tapdisk not found") elif not self._is_tapdisk_in_use(prt_tapdisk.minor): util.SMlog("Parent tapdisk not in use: shutting down %s" % \ read_cache_path) try: prt_tapdisk.shutdown() except: util.logException("shutting down parent tapdisk") else: util.SMlog("Parent tapdisk still in use: %s" % read_cache_path)
# the parent cache files are removed during the local SR's background # GC run
lock.release()
self._action = None
return \ "Key '%s' missing in environment. " % self.args[0] + \ "Not called in udev context?"
def getenv(cls, key): try: return os.environ[key] except KeyError, e: raise cls.KeyError(e.args[0])
if not self._action: self._action = self.getenv('ACTION') return self._action
self.event = event self.handler = handler
return "Uevent '%s' not handled by %s" % \ (self.event, self.handler.__class__.__name__)
action = self.get_action() try: fn = self.ACTIONS[action] except KeyError: raise self.UnhandledEvent(action, self)
return fn(self)
try: action = self.get_action() except: action = None return "%s[%s]" % (self.__class__.__name__, action)
return "blktap!control"
if not self._default_pool: self._default_pool = self.DefaultPool.from_kobject(self) return self._default_pool
return self.get_default_pool_attr().readline()
self.get_default_pool_attr().writeline(name)
return BlktapControl.get_pool(self.get_default_pool_name())
self.set_default_pool_name(pool.name)
self.name = name
return "No such pool: %s", self.name
path = "%s/pools/%s" % (self.sysfs_path(), name)
if not os.path.isdir(path): raise self.NoSuchPool(name)
return PagePool(path)
self.path = path self._size = None
return self.path
if not self._size: self._size = self.Size.from_kobject(self) return self._size
pages = str(pages) self.get_size_attr().writeline(pages)
pages = self.get_size_attr().readline() return int(pages)
def sysfs_bus_path(cls): return "/sys/bus/%s" % cls.SYSFS_BUSTYPE
path = "%s/devices/%s" % (self.sysfs_bus_path(), self.sysfs_devname())
return path
"""Xenbus device, in XS and sysfs"""
self.domid = int(domid) self.devid = int(devid) self._xbt = XenbusDevice.XBT_NIL
import xen.lowlevel.xs self.xs = xen.lowlevel.xs.xs()
path = "backend/%s/%d/%d" % (self.XENBUS_DEVTYPE, self.domid, self.devid) if key is not None: path = "%s/%s" % (path, key)
return path
syslog(prio, msg)
self._log(_syslog.LOG_INFO, msg)
self._log(_syslog.LOG_WARNING, "WARNING: " + msg)
val = self.xs.read(self._xbt, path) #self.info("read %s = '%s'" % (path, val)) return val
self.xs.write(self._xbt, path, val); self.info("wrote %s = '%s'" % (path, val))
self.xs.rm(self._xbt, path) self.info("removed %s" % path)
return self._xs_read_path(self.xs_path(key))
return self.read(key) is not None
self._xs_write_path(self.xs_path(key), val)
self._xs_rm_path(self.xs_path(key))
return self.has_key(None)
assert(self._xbt == XenbusDevice.XBT_NIL) self._xbt = self.xs.transaction_start()
ok = self.xs.transaction_end(self._xbt, 0) self._xbt = XenbusDevice.XBT_NIL return ok
ok = self.xs.transaction_end(self._xbt, 1) assert(ok == True) self._xbt = XenbusDevice.XBT_NIL
"""The standard protocol is: toolstack writes 'params', linux hotplug script translates this into physical-device=%x:%x""" if self.has_key("physical-device"): return try: params = self.read("params") frontend = self.read("frontend") is_cdrom = self._xs_read_path("%s/device-type") == "cdrom" # We don't have PV drivers for CDROM devices, so we prevent blkback # from opening the physical-device if not(is_cdrom): major_minor = os.stat(params).st_rdev major, minor = divmod(major_minor, 256) self.write("physical-device", "%x:%x" % (major, minor)) except: util.logException("BLKTAP2:create_physical_device")
xapi_path = "/xapi/%d/hotplug/%s/%d/hotplug" % (self.domid, self.XENBUS_DEVTYPE, self.devid) upstream_path = self.xs_path("hotplug-status") if online: self._xs_write_path(xapi_path, "online") self._xs_write_path(upstream_path, "connected") else: self._xs_rm_path(xapi_path) self._xs_rm_path(upstream_path)
return "%s-%d-%d" % (self.XENBUS_DEVTYPE, self.domid, self.devid)
return self.sysfs_devname()
def find(cls): pattern = "/sys/bus/%s/devices/%s*" % (cls.SYSFS_BUSTYPE, cls.XENBUS_DEVTYPE) for path in glob.glob(pattern):
name = os.path.basename(path) (_type, domid, devid) = name.split('-')
yield cls(domid, devid)
"""Xenbus backend device"""
def from_xs_path(cls, _path): (_backend, _type, domid, devid) = _path.split('/')
assert _backend == 'backend' assert _type == cls.XENBUS_DEVTYPE
domid = int(domid) devid = int(devid)
return cls(domid, devid)
"""A blkback VBD"""
XenBackendDevice.__init__(self, domid, devid) self._phy = None self._vdi_uuid = None self._q_state = None self._q_events = None
self.vbd = vbd self.str = _str
return "Backend %s " % self.vbd + \ "has %s = %s" % (self.KEY, self.str)
self.major = int(major) self.minor = int(minor)
def from_xbdev(cls, xbdev):
phy = xbdev.read("physical-device")
try: major, minor = phy.split(':') major = int(major, 0x10) minor = int(minor, 0x10) except Exception, e: raise xbdev.PhysicalDeviceError(xbdev, phy)
return cls(major, minor)
return os.makedev(self.major, self.minor)
return self.major == Tapdisk.major()
return "%s:%s" % (self.major, self.minor)
return \ self.major == other.major and \ self.minor == other.minor
if not self._phy: self._phy = self.PhysicalDevice.from_xbdev(self) return self._phy
"""Blkback sysfs node to select queue-state event notifications emitted."""
return int(self.readline(), 0x10)
self.writeline("0x%x" % mask)
if not self._q_events: self._q_events = self.QueueEvents.from_kobject(self) return self._q_events
if not self._vdi_uuid: self._vdi_uuid = self.read("sm-data/vdi-uuid") return self._vdi_uuid
return self.has_key("pause")
return self.has_key("shutdown-request")
return self.has_key("shutdown-done")
return self.has_key('queue-0/kthread-pid')
def find_by_physical_device(cls, phy): for dev in cls.find(): try: _phy = dev.get_physical_device() except cls.PhysicalDeviceError: continue
if _phy == phy: yield dev
def find_by_tap_minor(cls, minor): phy = cls.PhysicalDevice(Tapdisk.major(), minor) return cls.find_by_physical_device(phy)
def find_by_tap(cls, tapdisk): return cls.find_by_tap_minor(tapdisk.minor)
if not self.can_tap(): return False
phy = self.get_physical_device() if phy: return phy.is_tap()
return False
"""File VDIs for bare HVM. These are directly accessible by Qemu.""" try: self.get_physical_device()
except self.PhysicalDeviceError, e: vdi_type = self.read("type")
self.info("HVM VDI: type=%s" % vdi_type)
if e.str is not None or vdi_type != 'file': raise
return True
return False
return not self.is_bare_hvm()
if not ident: ident = self.__class__.__name__
self.ident = ident self._vbd = None self._tapdisk = None
UEventHandler.__init__(self)
self.xs_path = self.getenv('XENBUS_PATH') openlog(str(self), 0, self.LOG_FACILITY)
UEventHandler.run(self)
try: path = self.xs_path except: path = None
try: action = self.get_action() except: action = None
return "%s[%s](%s)" % (self.ident, action, path)
syslog(prio, msg) util.SMlog("%s: " % self + msg)
self._log(_syslog.LOG_INFO, msg)
self._log(_syslog.LOG_WARNING, "WARNING: " + msg)
self._log(_syslog.LOG_ERR, "ERROR: " + msg)
if not self._vbd: self._vbd = Blkback.from_xs_path(self.xs_path) return self._vbd
if not self._tapdisk: minor = self.get_vbd().get_physical_device().minor self._tapdisk = Tapdisk.from_minor(minor) return self._tapdisk
# # Events #
vbd = self.get_vbd()
# Manage blkback transitions # self._manage_vbd()
vbd.create_physical_device()
vbd.signal_hotplug()
def add(self): try: self.__add() except Attribute.NoSuchAttribute, e: # # FIXME: KOBJ_ADD is racing backend.probe, which # registers device attributes. So poll a little. # self.warn("%s, still trying." % e) raise RetryLoop.TransientFailure(e)
vbd = self.get_vbd()
# 1. Pause or resume tapdisk (if there is one)
if vbd.has_tap(): pass #self._pause_update_tap()
# 2. Signal Xapi.VBD.pause/resume completion
self._signal_xapi()
vbd = self.get_vbd()
# NB. Beware of spurious change events between shutdown # completion and device removal. Also, Xapi.VM.migrate will # hammer a couple extra shutdown-requests into the source VBD.
while True: vbd.begin()
if not vbd.exists() or \ vbd.shutdown_done(): break
self.__change()
if vbd.commit(): return
vbd.abort() self.info("spurious uevent, ignored.")
vbd = self.get_vbd()
vbd.signal_hotplug(False)
'change': change, 'remove': remove }
# # VDI.pause #
"""Enumerate all VBDs on our tapdisk. Returns true iff any was paused"""
tapdisk = self.get_tapdisk() TapState = Tapdisk.PauseState
PAUSED = 'P' RUNNING = 'R' PAUSED_SHUTDOWN = 'P,S' # NB. Shutdown/paused is special. We know it's not going # to restart again, so it's a RUNNING. Still better than # backtracking a removed device during Vbd.unplug completion.
next = TapState.RUNNING vbds = {}
for vbd in Blkback.find_by_tap(tapdisk): name = str(vbd)
pausing = vbd.pause_requested() closing = vbd.shutdown_requested() running = vbd.running()
if pausing: if closing and not running: vbds[name] = PAUSED_SHUTDOWN else: vbds[name] = PAUSED next = TapState.PAUSED
else: vbds[name] = RUNNING
self.info("tapdev%d (%s): %s -> %s" % (tapdisk.minor, tapdisk.pause_state(), vbds, next))
return next == TapState.PAUSED
vbd = self.get_vbd()
if self._tap_should_pause(): self._pause_tap() else: self._resume_tap()
tapdisk = self.get_tapdisk()
if not tapdisk.is_paused(): self.info("pausing %s" % tapdisk) tapdisk.pause()
tapdisk = self.get_tapdisk()
# NB. Raw VDI snapshots. Refresh the physical path and # type while resuming. vbd = self.get_vbd() vdi_uuid = vbd.get_vdi_uuid()
if tapdisk.is_paused(): self.info("loading vdi uuid=%s" % vdi_uuid) vdi = VDI.from_cli(vdi_uuid) _type = vdi.get_tap_type() path = vdi.get_phy_path() self.info("resuming %s on %s:%s" % (tapdisk, _type, path)) tapdisk.unpause(_type, path)
# # VBD.pause/shutdown #
vbd = self.get_vbd()
# NB. Hook into VBD state transitions.
events = vbd.get_queue_events()
mask = 0 mask |= events.QUEUE_PAUSE_DONE # pause/unpause mask |= events.QUEUE_SHUTDOWN_DONE # shutdown
# TODO: mask |= events.QUEUE_SHUTDOWN_REQUEST, for shutdown=force # TODO: mask |= events.QUEUE_RUNNING, for ionice updates etc
events.set_mask(mask) self.info("wrote %s = %#02x" % (events.path, mask))
vbd = self.get_vbd()
pausing = vbd.pause_requested() closing = vbd.shutdown_requested() running = vbd.running()
handled = 0
if pausing and not running: if not vbd.has_key('pause-done'): vbd.write('pause-done', '') handled += 1
if not pausing: if vbd.has_key('pause-done'): vbd.rm('pause-done') handled += 1
if closing and not running: if not vbd.has_key('shutdown-done'): vbd.write('shutdown-done', '') handled += 1
if handled > 1: self.warn("handled %d events, " % handled + "pausing=%s closing=%s running=%s" % \ (pausing, closing, running))
import sys prog = os.path.basename(sys.argv[0])
# # Simple CLI interface for manual operation # # tap.* level calls go down to local Tapdisk()s (by physical path) # vdi.* level calls run the plugin calls across host boundaries. #
def usage(stream): print >>stream, \ "usage: %s tap.{list|major}" % prog print >>stream, \ " %s tap.{launch|find|get|pause|" % prog + \ "unpause|shutdown|stats} {[<tt>:]<path>} | [minor=]<int> | .. }" print >>stream, \ " %s vbd.uevent" % prog
try: cmd = sys.argv[1] except IndexError: usage(sys.stderr) sys.exit(1)
try: _class, method = cmd.split('.') except: usage(sys.stderr) sys.exit(1)
# # Local Tapdisks #
if cmd == 'tap.major':
print "%d" % Tapdisk.major()
elif cmd == 'tap.launch':
tapdisk = Tapdisk.launch_from_arg(sys.argv[2]) print >> sys.stderr, "Launched %s" % tapdisk
elif _class == 'tap':
attrs = {} for item in sys.argv[2:]: try: key, val = item.split('=') attrs[key] = val continue except ValueError: pass
try: attrs['minor'] = int(item) continue except ValueError: pass
try: arg = Tapdisk.Arg.parse(item) attrs['_type'] = arg.type attrs['path'] = arg.path continue except Tapdisk.Arg.InvalidArgument: pass
attrs['path'] = item
if cmd == 'tap.list':
for tapdisk in Tapdisk.list(**attrs): blktap = tapdisk.get_blktap() print tapdisk, print "%s: task=%s pool=%s" % \ (blktap, blktap.get_task_pid(), blktap.get_pool_name())
elif cmd == 'tap.vbds': # Find all Blkback instances for a given tapdisk
for tapdisk in Tapdisk.list(**attrs): print "%s:" % tapdisk, for vbd in Blkback.find_by_tap(tapdisk): print vbd,
else:
if not attrs: usage(sys.stderr) sys.exit(1)
try: tapdisk = Tapdisk.get(**attrs) except TypeError: usage(sys.stderr) sys.exit(1)
if cmd == 'tap.shutdown': # Shutdown a running tapdisk, or raise tapdisk.shutdown() print >> sys.stderr, "Shut down %s" % tapdisk
elif cmd == 'tap.pause': # Pause an unpaused tapdisk, or raise tapdisk.pause() print >> sys.stderr, "Paused %s" % tapdisk
elif cmd == 'tap.unpause': # Unpause a paused tapdisk, or raise tapdisk.unpause() print >> sys.stderr, "Unpaused %s" % tapdisk
elif cmd == 'tap.stats': # Gather tapdisk status stats = tapdisk.stats() print "%s:" % tapdisk print json.dumps(stats, indent=True)
else: usage(sys.stderr) sys.exit(1)
elif cmd == 'vbd.uevent':
hnd = BlkbackEventHandler(cmd)
if not sys.stdin.isatty(): try: hnd.run() except Exception, e: hnd.error("Unhandled Exception: %s" % e)
import traceback _type, value, tb = sys.exc_info() trace = traceback.format_exception(_type, value, tb) for entry in trace: for line in entry.rstrip().split('\n'): util.SMlog(line) else: hnd.run()
elif cmd == 'vbd.list':
for vbd in Blkback.find(): print vbd, \ "physical-device=%s" % vbd.get_physical_device(), \ "pause=%s" % vbd.pause_requested()
else: usage(sys.stderr) sys.exit(1) |