test_splits takes allowed/target codecs as arguments, no longer relies on get_codec.
[audiomangler.git] / audiomangler / util.py
blob215de56677257510c666c4fb44acc42144cf71ba
1 # -*- coding: utf-8 -*-
2 ###########################################################################
3 # Copyright (C) 2008 by Andrew Mahone
4 # <andrew.mahone@gmail.com>
6 # Copyright: See COPYING file that comes with this distribution
8 ###########################################################################
9 import os, stat
10 from audiomangler.config import Config
11 from audiomangler.logging import msg, err, fatal, WARNING, ERROR
13 class ClassInitMeta(type):
14 def __new__(cls, name, bases, cls_dict):
15 class_init = cls_dict.get('__classinit__', None)
16 return super(ClassInitMeta, cls).__new__(cls, name, bases, cls_dict)
18 def __init__(self, name, bases, cls_dict):
19 if callable(getattr(self, '__classinit__', None)):
20 self.__classinit__(name, bases, cls_dict)
22 def copy(src, dst):
23 fsrc = None
24 fdst = None
25 try:
26 fsrc = open(src, 'rb')
27 fdst = open(dst, 'wb')
28 while 1:
29 buf = fsrc.read(16384)
30 if not buf:
31 break
32 fdst.write(buf)
33 finally:
34 if fsrc:
35 fsrc.close()
36 if fdst:
37 fdst.close()
38 st = os.stat(src)
39 mode = stat.S_IMODE(st.st_mode)
40 if hasattr(os, 'utime'):
41 try:
42 os.utime(dst, (st.st_atime, st.st_mtime))
43 except OSError:
44 pass
46 def move(src, dst):
47 try:
48 os.rename(src, dst)
49 except OSError:
50 copy(src, dst)
51 os.unlink(src)
53 def test_splits(dir_list, allowedcodecs=None, targetcodec=None):
54 if targetcodec and allowdcodecs:
55 postadd = lambda type_: {} if type_ in allowedcodecs else {'type':targetcodec.type_, 'ext':targetcodec.ext}
56 else:
57 postadd = lambda type_: {}
58 for (dir_, files) in dir_list.items():
59 dstdirs = set()
60 for file_ in files:
61 src = file_.filename
62 dst = fsencode(file_.format(postadd=postadd(file_.type_)))
63 dstdir = os.path.split(dst)[0]
64 dstdirs.add(dstdir)
65 if len(dstdirs) > 1:
66 onsplit = Config['onsplit']
67 if onsplit == 'abort':
68 fatal(consoleformat=u"tracks in %(dir_p)s would be placed in different target directories, aborting\nset onsplit to 'warn' or 'ignore' to proceed anyway",
69 format="split: %(dir_)r", dir_=dir_, dir_p=fsdecode(dir_), nologerror=1)
71 def fsencode(string):
72 return string.encode(Config['fs_encoding'], Config.get('fs_encoding_err', 'underscorereplace'))
74 def fsdecode(string):
75 return string.decode(Config['fs_encoding'], Config.get('fs_encoding_err', 'replace'))
77 __all__ = ['copy', 'move', 'fsencode', 'fsdecode', 'test_splits']