#!/usr/bin/env python # # lba2fs # # Copyright (c) 2009 Eduardo Habkost # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # lba2fs # Utility to find out how a given block device sector is being used. # # Based on useful documentation by Bruce Allen, Douglas Gilbert, and Frederic # BOITEUX, found at: http://smartmontools.sourceforge.net/badblockhowto.html # # The ultimate purpose is to find out which file on the filesystem, if any, is # using the sector. It supports checking partition tables, LVM volumes, and # ext2 filesystem blocks. # # The script just reads from the devices, so it should be always safe to # run it. But be very careful when using the numbers calculated by it # to write to a disk sector. Always keep your backups up to date. import sys, os, re def pinfo(s): print s def debug(s): #print s pass # Some variable name conventions, to avoid confusion # with the many units: # # o_thing_unit - Offset in inside # # *_s - in sectors # *_blk - in fs blocks # *_pe - in PEs # *_b - in bytes # *_kb - in kibibytes sector_size = 512 def dd_cmd(**args): cmd = ['dd'] for k in args: # workaround 'if' reserved keyword problem: # allow _ prefix: f = k.lstrip('_') cmd.append('%s=%s' % (f, args[k])) return cmd def run_dd(**args): cmd = dd_cmd(**args) pinfo('Command: %s' % (' '.join(cmd))) os.spawnvp(os.P_WAIT, 'dd', cmd) class ProbeState: def __init__(self, back): self._back = back def back(self): return self._back class CleanState(ProbeState): """When we've found out that it seems to be safe to overwrite the data. """ def __init__(self, where, bs, seek, msg): ProbeState.__init__(self, where) self.where = where self.bs = bs self.seek = seek self.msg = msg def __str__(self): return self.msg def dump_info(self): print "" print "GOOD:",self.msg print "" print "You can zero the block running the following command, but:" print "1) Don't do that if the device is in use (e.g. filesystem mounted)" print "2) *You will lose data* that is stored on the block. It looks like" print " It is safe to do that on this block, but be careful. Are your" print " backups up to date? 8)" print "" print " dd if=/dev/zero of=%s bs=%d count=1 seek=%d" % (self.where.dev, self.bs, self.seek) print "" print "I recommend doing a read-test on the block first, to see if an I/O error" print "is returned. Use the 'read' command for that." print "" def probe(self): print "I don't know where to go from here!" def test_reading(self, where): self.where.test_reading() def hexdump(self): self.where.hexdump() class BlockDeviceOffset(ProbeState): """A (device, sector_offset) tuple """ def __init__(self, back, dev, sector): ProbeState.__init__(self, back) self.dev = dev self.sector = sector def __str__(self): return "%s sector %d" % (self.dev, self.sector) def dump_info(self): print self def probe(self): """Probe for the way the device is being used""" _,ofile = os.popen2(['file', '-s', '-L', self.dev], 'r') info = ofile.read() if 'partition' in info: type,fn = 'partition table',self.check_partition elif 'LVM' in info: type,fn = "LVM volume",self.check_lvm_volume elif 'ext3 filesystem' in info: type,fn = 'ext3 filesystem',self.check_ext2_inode elif 'Linux/i386 swap file' in info: raise Exception("%s looks like a swap partition!" % (self.dev)) else: print "I don't know what is inside device %s" % (self.dev) print "Output from 'file': %s" % (info) return print "I think I've found: %s" % (type) return fn() def check_partition(self): """Check on which partition the sector is located. Returns a new BlockDeviceOffset """ # look for partitions pinfo('Checking the partition list...') _,o = os.popen2(['fdisk', '-lu', self.dev], 'r') for l in o.readlines(): if not l.startswith('/dev/'): continue # just in case there is the boot marker l = l.replace('*','') parts = l.split() part,s,e = parts[0],int(parts[1]),int(parts[2]) debug('%s %d %d' % (part,s,e)) if s <= self.sector <= e: return BlockDeviceOffset(self, part, self.sector-s) raise Exception("No partition found for sector %d" % (self.sector)) re_seg = re.compile("Logical extent (.*) to (.*):\n"+ \ " *Type *\t(.*)\n"+ \ " *Physical volume *\t(.*)\n"+ \ " *Physical extents *\t(.*) to (.*)""", re.M) def check_lv_map(self, lv, pe_size_s, o_pv_pe): """Helper to check_lvm_volume() Returns the LE corresponding to the PE """ def linear_lv_map(le1, le2, pe1, pe2, o_pv_pe): o_lv_pe = (o_pv_pe - pe1) + le1 return o_lv_pe lv_map_checkers = { 'linear': linear_lv_map, } pinfo('Checking maps of LV...') _,lvd = os.popen2(['lvdisplay', '--maps', lv], 'r') o = lvd.read() debug(o) for le1,le2,type,pv_name,pe1,pe2 in self.re_seg.findall(o): type = type.strip() debug('%s %s %s %s' % (type, pv_name, pe1, pe2)) pe1,pe2 = int(pe1),int(pe2) le1,le2 = int(le1),int(le2) if self.dev == pv_name and int(pe1) <= o_pv_pe <= int(pe2): if not lv_map_checkers.has_key(type): raise Exception("I don't know how to handle LV map of type %s" % (type)) fn = lv_map_checkers[type] return fn(le1, le2, pe1, pe2, o_pv_pe) re_pes = re.compile(" Physical extent (.*) to (.*):\n((?: .*\n)*)", re.M) re_lv = re.compile(" *Logical volume *\t(.*)\n *Logical extents *\t(.*) to (.*)", re.M) re_free = re.compile(" *FREE") def check_lvm_volume(self): """Check on which LVM logical volume the sector is located Returns a new BlockDeviceOffset for the offset inside the LV. """ pinfo('Checking the PE where the block is located') _,pvs = os.popen2(['pvs', '-o', 'pv_name,pe_start', '--units', 's', self.dev], 'r') o = pvs.readlines()[1:] assert len(o) == 1 # just one line of output, except for the header name,pe_start = o[0].split() assert name == self.dev assert pe_start.endswith('S') pe_start = pe_start.replace('S','') pe_start_s = int(pe_start) pinfo('pe_start: %d sectors' % (pe_start_s)) _,pvd = os.popen2(['pvdisplay', '-c', self.dev], 'r') o = pvd.readlines() debug(repr(o)) assert len(o) == 1 # just one line f = o[0].split(':') pe_size_kb = int(f[7]) pe_size_b = pe_size_kb*1024 pe_size_s = pe_size_b/sector_size # sectors from pe_start o_pestart_s = (self.sector-pe_start_s) if o_pestart_s < 0: raise Exception("Sector is before pe_start on LVM PV") # PEs from pe_start o_pv_pe = int(o_pestart_s/pe_size_s) # sectors from the PE o_pe_s = o_pestart_s % pe_size_s pinfo('PE: %d' % (o_pv_pe)) pinfo('Checking on which LV the PE is located') _,pvd = os.popen2(['pvdisplay', '-m', self.dev], 'r') o = pvd.read() debug(o) for pe1,pe2,info in self.re_pes.findall(o): if int(pe1) <= o_pv_pe <= int(pe2): m = self.re_lv.match(info) if m: pinfo("Found PE range on map (LV)") lv,le1,le2 = m.groups() debug('%s %s %s' % (lv,le1,le2)) o_lv_pe = self.check_lv_map(lv, pe_size_s, o_pv_pe) o_lv_s = o_lv_pe*pe_size_s + o_pe_s return BlockDeviceOffset(self, lv, o_lv_s) m = self.re_free.match(info) if m: return CleanState(self, sector_size, self.sector, "Sector inside PV FREE area. PV: %s, sector %d" % (self.dev, self.sector)) # No match? raise Exception("Unknown info on map: %r" % (info)) # No PE range match? raise Exception("Couldn't find PE %d on LVM map" % (o_pv_pe)) re_blk = re.compile("^Block size: *([0-9]*)$", re.M) def check_ext2_inode(self): pinfo('Checking ext2 fs block...') _,tune = os.popen2(['tune2fs', '-l', self.dev], 'r') o = tune.read() debug(o) blk_size = int(self.re_blk.search(o).group(1)) pinfo('Block size: %d' % (blk_size)) block = (self.sector*sector_size)/blk_size pinfo('Block: %d' % (block)) idbg,odbg = os.popen2(['debugfs', self.dev]) idbg.write('testb %d\n' % (block)) idbg.flush() inuse = odbg.readline() if 'not in use' in inuse: inuse = False elif 'in use' in inuse: inuse = True else: raise Exception("Unexpected response to inuse: %s" % (inuse)) if not inuse: return CleanState(self, blk_size, block, "ext2fs block %d at %s, not in use" % (block, self.dev)) pinfo('Block is in use. Checking which inode uses the block...') idbg.write('icheck %d\n' % (block)) idbg.flush() odbg.readline() # skip first line b,inode = odbg.readline().strip().split(None, 1) assert int(b) == block if inode == '': raise Exception("Block %d is used, but no inode found" % (block)) inode = int(inode) pinfo('Inode: %d' % (inode)) pinfo('Checking inode path...') idbg.write('ncheck %d\n' % (inode)) idbg.flush() idbg.close() print odbg.read() raise Exception("Now you are on your own... :)") def test_reading(self, where): pinfo('Running read-test using dd...') run_dd(_if=self.dev, bs=sector_size, of=where, skip=self.sector, count=1) def hexdump(self): cmd = dd_cmd(_if=self.dev, bs=sector_size, skip=self.sector, count=1) _,o = os.popen2(cmd, 'r') d = o.read() hd = os.popen('hexdump -C', 'w') hd.write(d) hd.close() def main(argv): print 'lba2fs, by Eduardo Habkost ' print 'If you want to know what to do with the output of this program, check:' print 'http://smartmontools.sourceforge.net/badblockhowto.html' print disk = argv[1] o_disk_s = int(argv[2]) o = BlockDeviceOffset(None, disk, o_disk_s) cmds = { 'part': ('Check partition table', lambda: o.check_partition()), 'lvm': ('Check LVM Phyisical volume', lambda: o.check_lvm_volume()), 'ext2': ('Check block on ext2 filesystem', lambda: o.check_ext2_inode()), 'probe': ('Automatically probe the device', lambda: o.probe()), 'back': ('Go back', lambda: o.back()), 'read': ('Read-test the sector that was found, using dd', lambda: o.test_reading('/dev/null')), 'hexdump': ('hexdump the sector', lambda: o.hexdump()), '?': ('Help', lambda: help()), 'help': ('Help', lambda: help()), 'quit': ('Quit', lambda: bye()), 'exit': ('Quit', lambda: bye()), } def help(): for cmd in cmds: desc,_ = cmds[cmd] print '%-10s %s' % (cmd, desc) def bye(): print 'Bye!' sys.exit(0) while True: n = None print "[%s]" % (o) print 'Press Return to automatically probe, or enter command:' cmd = raw_input('cmd> ').strip() if not cmd: cmd = 'probe' _,fn = cmds.get(cmd, (None, None)) if not fn: print 'Invalid command: %s' % (cmd) else: try: n = fn() except Exception,e: print 'Exception:' print ' ',e else: if n: o = n o.dump_info() if __name__ == '__main__': sys.exit(main(sys.argv)) # vim: et ts=4 sw=4