This fixes LP: #819507, to make consume_userdata run 'always'
consume_userdata should really run always, rather than once per instance. The documentation says that boothooks were on their own for per-instance but since this routine was only being called once, they would only get called once. This modifies the behavior to be: user_script: per_always cloud_config : per_always upstart_job : per_instance cloud_boothook: per_always In order to not break part handlers that are existing, and expect to only be called once per instance, this adds a 'handler_version' item in a handler that can indicate the version (currently 1 or 2). If it is 2, then the hander will be passed the frequency (per-instance or per-always) that this is being run. That way the handler can differenciate between them. This also makes 'bootcmd' run every boot. That should be changable in cloud-config though, so users who dont like the behavior can modify it.
This commit is contained in:
parent
b5f300c46f
commit
fa55bf8ddb
@ -140,8 +140,10 @@ def main():
|
|||||||
|
|
||||||
# parse the user data (ec2-run-userdata.py)
|
# parse the user data (ec2-run-userdata.py)
|
||||||
try:
|
try:
|
||||||
cloud.sem_and_run("consume_userdata", "once-per-instance",
|
ran = cloud.sem_and_run("consume_userdata", cloudinit.per_instance,
|
||||||
cloud.consume_userdata,[],False)
|
cloud.consume_userdata,[cloudinit.per_instance],False)
|
||||||
|
if not ran:
|
||||||
|
cloud.consume_userdata(cloudinit.per_always)
|
||||||
except:
|
except:
|
||||||
warn("consuming user data failed!\n")
|
warn("consuming user data failed!\n")
|
||||||
raise
|
raise
|
||||||
|
@ -25,9 +25,9 @@ import os
|
|||||||
import subprocess
|
import subprocess
|
||||||
import time
|
import time
|
||||||
|
|
||||||
per_instance="once-per-instance"
|
per_instance= cloudinit.per_instance
|
||||||
per_always="always"
|
per_always = cloudinit.per_always
|
||||||
per_once="once"
|
per_once = cloudinit.per_once
|
||||||
|
|
||||||
class CloudConfig():
|
class CloudConfig():
|
||||||
cfgfile = None
|
cfgfile = None
|
||||||
|
@ -18,6 +18,8 @@
|
|||||||
import cloudinit.util as util
|
import cloudinit.util as util
|
||||||
import subprocess
|
import subprocess
|
||||||
import tempfile
|
import tempfile
|
||||||
|
from cloudinit.CloudConfig import per_always
|
||||||
|
frequency = per_always
|
||||||
|
|
||||||
def handle(name,cfg,cloud,log,args):
|
def handle(name,cfg,cloud,log,args):
|
||||||
if not cfg.has_key("bootcmd"):
|
if not cfg.has_key("bootcmd"):
|
||||||
|
@ -190,11 +190,10 @@ def preprocess_userdata(data):
|
|||||||
process_includes(email.message_from_string(decomp_str(data)),parts)
|
process_includes(email.message_from_string(decomp_str(data)),parts)
|
||||||
return(parts2mime(parts))
|
return(parts2mime(parts))
|
||||||
|
|
||||||
# callbacks is a dictionary with:
|
# callback is a function that will be called with (data, content_type, filename, payload)
|
||||||
# { 'content-type': handler(data,content_type,filename,payload) }
|
def walk_userdata(istr, callback, data = None):
|
||||||
def walk_userdata(str, callbacks, data = None):
|
|
||||||
partnum = 0
|
partnum = 0
|
||||||
for part in email.message_from_string(str).walk():
|
for part in email.message_from_string(istr).walk():
|
||||||
# multipart/* are just containers
|
# multipart/* are just containers
|
||||||
if part.get_content_maintype() == 'multipart':
|
if part.get_content_maintype() == 'multipart':
|
||||||
continue
|
continue
|
||||||
@ -207,8 +206,7 @@ def walk_userdata(str, callbacks, data = None):
|
|||||||
if not filename:
|
if not filename:
|
||||||
filename = 'part-%03d' % partnum
|
filename = 'part-%03d' % partnum
|
||||||
|
|
||||||
if callbacks.has_key(ctype):
|
callback(data, ctype, filename, part.get_payload())
|
||||||
callbacks[ctype](data,ctype,filename,part.get_payload())
|
|
||||||
|
|
||||||
partnum = partnum+1
|
partnum = partnum+1
|
||||||
|
|
||||||
|
@ -46,6 +46,10 @@ pathmap = {
|
|||||||
None : "",
|
None : "",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
per_instance="once-per-instance"
|
||||||
|
per_always="always"
|
||||||
|
per_once="once"
|
||||||
|
|
||||||
parsed_cfgs = { }
|
parsed_cfgs = { }
|
||||||
|
|
||||||
import os
|
import os
|
||||||
@ -63,6 +67,7 @@ import logging
|
|||||||
import logging.config
|
import logging.config
|
||||||
import StringIO
|
import StringIO
|
||||||
import glob
|
import glob
|
||||||
|
import traceback
|
||||||
|
|
||||||
class NullHandler(logging.Handler):
|
class NullHandler(logging.Handler):
|
||||||
def emit(self,record): pass
|
def emit(self,record): pass
|
||||||
@ -111,14 +116,16 @@ class CloudInit:
|
|||||||
ds_deps = [ DataSource.DEP_FILESYSTEM, DataSource.DEP_NETWORK ]
|
ds_deps = [ DataSource.DEP_FILESYSTEM, DataSource.DEP_NETWORK ]
|
||||||
datasource = None
|
datasource = None
|
||||||
|
|
||||||
|
builtin_handlers = [ ]
|
||||||
|
|
||||||
def __init__(self, ds_deps = None, sysconfig=system_config):
|
def __init__(self, ds_deps = None, sysconfig=system_config):
|
||||||
self.part_handlers = {
|
self.builtin_handlers = [
|
||||||
'text/x-shellscript' : self.handle_user_script,
|
[ 'text/x-shellscript', self.handle_user_script, per_always ],
|
||||||
'text/cloud-config' : self.handle_cloud_config,
|
[ 'text/cloud-config', self.handle_cloud_config, per_always ],
|
||||||
'text/upstart-job' : self.handle_upstart_job,
|
[ 'text/upstart-job', self.handle_upstart_job, per_instance ],
|
||||||
'text/part-handler' : self.handle_handler,
|
[ 'text/cloud-boothook', self.handle_cloud_boothook, per_always ],
|
||||||
'text/cloud-boothook' : self.handle_cloud_boothook
|
]
|
||||||
}
|
|
||||||
if ds_deps != None:
|
if ds_deps != None:
|
||||||
self.ds_deps = ds_deps
|
self.ds_deps = ds_deps
|
||||||
self.sysconfig=sysconfig
|
self.sysconfig=sysconfig
|
||||||
@ -249,7 +256,7 @@ class CloudInit:
|
|||||||
return("%s/%s.%s" % (get_cpath("sem"), name, freq))
|
return("%s/%s.%s" % (get_cpath("sem"), name, freq))
|
||||||
|
|
||||||
def sem_has_run(self,name,freq):
|
def sem_has_run(self,name,freq):
|
||||||
if freq == "always": return False
|
if freq == per_always: return False
|
||||||
semfile = self.sem_getpath(name,freq)
|
semfile = self.sem_getpath(name,freq)
|
||||||
if os.path.exists(semfile):
|
if os.path.exists(semfile):
|
||||||
return True
|
return True
|
||||||
@ -265,7 +272,7 @@ class CloudInit:
|
|||||||
if e.errno != errno.EEXIST:
|
if e.errno != errno.EEXIST:
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
if os.path.exists(semfile) and freq != "always":
|
if os.path.exists(semfile) and freq != per_always:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# race condition
|
# race condition
|
||||||
@ -294,7 +301,7 @@ class CloudInit:
|
|||||||
def sem_and_run(self,semname,freq,func,args=[],clear_on_fail=False):
|
def sem_and_run(self,semname,freq,func,args=[],clear_on_fail=False):
|
||||||
if self.sem_has_run(semname,freq):
|
if self.sem_has_run(semname,freq):
|
||||||
log.debug("%s already ran %s", semname, freq)
|
log.debug("%s already ran %s", semname, freq)
|
||||||
return
|
return False
|
||||||
try:
|
try:
|
||||||
if not self.sem_acquire(semname,freq):
|
if not self.sem_acquire(semname,freq):
|
||||||
raise Exception("Failed to acquire lock on %s" % semname)
|
raise Exception("Failed to acquire lock on %s" % semname)
|
||||||
@ -305,13 +312,15 @@ class CloudInit:
|
|||||||
self.sem_clear(semname,freq)
|
self.sem_clear(semname,freq)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
# get_ipath : get the instance path for a name in pathmap
|
# get_ipath : get the instance path for a name in pathmap
|
||||||
# (/var/lib/cloud/instances/<instance>/name)<name>)
|
# (/var/lib/cloud/instances/<instance>/name)<name>)
|
||||||
def get_ipath(self, name=None):
|
def get_ipath(self, name=None):
|
||||||
return("%s/instances/%s%s"
|
return("%s/instances/%s%s"
|
||||||
% (varlibdir,self.get_instance_id(), pathmap[name]))
|
% (varlibdir,self.get_instance_id(), pathmap[name]))
|
||||||
|
|
||||||
def consume_userdata(self):
|
def consume_userdata(self, frequency=per_instance):
|
||||||
self.get_userdata()
|
self.get_userdata()
|
||||||
data = self
|
data = self
|
||||||
|
|
||||||
@ -323,63 +332,40 @@ class CloudInit:
|
|||||||
sys.path.insert(0,cdir)
|
sys.path.insert(0,cdir)
|
||||||
sys.path.insert(0,idir)
|
sys.path.insert(0,idir)
|
||||||
|
|
||||||
|
part_handlers = { }
|
||||||
# add handlers in cdir
|
# add handlers in cdir
|
||||||
for fname in glob.glob("%s/*.py" % cdir):
|
for fname in glob.glob("%s/*.py" % cdir):
|
||||||
if not os.path.isfile(fname): continue
|
if not os.path.isfile(fname): continue
|
||||||
modname = os.path.basename(fname)[0:-3]
|
modname = os.path.basename(fname)[0:-3]
|
||||||
try:
|
try:
|
||||||
mod = __import__(modname)
|
mod = __import__(modname)
|
||||||
lister = getattr(mod, "list_types")
|
handler_register(mod, part_handlers, data, frequency)
|
||||||
handler = getattr(mod, "handle_part")
|
log.debug("added handler for [%s] from %s" % (mod.list_types(), fname))
|
||||||
mtypes = lister()
|
|
||||||
for mtype in mtypes:
|
|
||||||
self.part_handlers[mtype]=handler
|
|
||||||
log.debug("added handler for [%s] from %s" % (mtypes,fname))
|
|
||||||
except:
|
except:
|
||||||
log.warn("failed to initialize handler in %s" % fname)
|
log.warn("failed to initialize handler in %s" % fname)
|
||||||
util.logexc(log)
|
util.logexc(log)
|
||||||
|
|
||||||
# give callbacks opportunity to initialize
|
# add the internal handers if their type hasn't been already claimed
|
||||||
for ctype, func in self.part_handlers.items():
|
for (btype, bhand, bfreq) in self.builtin_handlers:
|
||||||
func(data, "__begin__",None,None)
|
if btype in part_handlers:
|
||||||
|
continue
|
||||||
|
handler_register(InternalPartHandler(bhand, [btype], bfreq),
|
||||||
|
part_handlers, data, frequency)
|
||||||
|
|
||||||
|
# walk the data
|
||||||
|
pdata = { 'handlers': part_handlers, 'handlerdir': idir,
|
||||||
|
'data' : data, 'frequency': frequency }
|
||||||
UserDataHandler.walk_userdata(self.get_userdata(),
|
UserDataHandler.walk_userdata(self.get_userdata(),
|
||||||
self.part_handlers, data)
|
partwalker_callback, data = pdata)
|
||||||
|
|
||||||
# give callbacks opportunity to finalize
|
# give callbacks opportunity to finalize
|
||||||
for ctype, func in self.part_handlers.items():
|
called = [ ]
|
||||||
func(data,"__end__",None,None)
|
for (mtype, mod) in part_handlers.iteritems():
|
||||||
|
if mod in called:
|
||||||
|
continue
|
||||||
|
handler_call_end(mod, data, frequency)
|
||||||
|
|
||||||
def handle_handler(self,data,ctype,filename,payload):
|
def handle_user_script(self,data,ctype,filename,payload, frequency):
|
||||||
if ctype == "__end__": return
|
|
||||||
if ctype == "__begin__" :
|
|
||||||
self.handlercount = 0
|
|
||||||
return
|
|
||||||
|
|
||||||
self.handlercount=self.handlercount+1
|
|
||||||
|
|
||||||
# write content to instance's handlerdir
|
|
||||||
handlerdir = self.get_ipath("handlers")
|
|
||||||
modname = 'part-handler-%03d' % self.handlercount
|
|
||||||
modfname = modname + ".py"
|
|
||||||
util.write_file("%s/%s" % (handlerdir,modfname), payload, 0600)
|
|
||||||
|
|
||||||
try:
|
|
||||||
mod = __import__(modname)
|
|
||||||
lister = getattr(mod, "list_types")
|
|
||||||
handler = getattr(mod, "handle_part")
|
|
||||||
except:
|
|
||||||
import traceback
|
|
||||||
traceback.print_exc(file=sys.stderr)
|
|
||||||
return
|
|
||||||
|
|
||||||
# - call it with '__begin__'
|
|
||||||
handler(data, "__begin__", None, None)
|
|
||||||
|
|
||||||
# - add it self.part_handlers
|
|
||||||
for mtype in lister():
|
|
||||||
self.part_handlers[mtype]=handler
|
|
||||||
|
|
||||||
def handle_user_script(self,data,ctype,filename,payload):
|
|
||||||
if ctype == "__end__": return
|
if ctype == "__end__": return
|
||||||
if ctype == "__begin__":
|
if ctype == "__begin__":
|
||||||
# maybe delete existing things here
|
# maybe delete existing things here
|
||||||
@ -390,7 +376,11 @@ class CloudInit:
|
|||||||
util.write_file("%s/%s" %
|
util.write_file("%s/%s" %
|
||||||
(scriptsdir,filename), util.dos2unix(payload), 0700)
|
(scriptsdir,filename), util.dos2unix(payload), 0700)
|
||||||
|
|
||||||
def handle_upstart_job(self,data,ctype,filename,payload):
|
def handle_upstart_job(self,data,ctype,filename,payload, frequency):
|
||||||
|
# upstart jobs are only written on the first boot
|
||||||
|
if frequency != per_instance:
|
||||||
|
return
|
||||||
|
|
||||||
if ctype == "__end__" or ctype == "__begin__": return
|
if ctype == "__end__" or ctype == "__begin__": return
|
||||||
if not filename.endswith(".conf"):
|
if not filename.endswith(".conf"):
|
||||||
filename=filename+".conf"
|
filename=filename+".conf"
|
||||||
@ -398,7 +388,7 @@ class CloudInit:
|
|||||||
util.write_file("%s/%s" % ("/etc/init",filename),
|
util.write_file("%s/%s" % ("/etc/init",filename),
|
||||||
util.dos2unix(payload), 0644)
|
util.dos2unix(payload), 0644)
|
||||||
|
|
||||||
def handle_cloud_config(self,data,ctype,filename,payload):
|
def handle_cloud_config(self,data,ctype,filename,payload, frequency):
|
||||||
if ctype == "__begin__":
|
if ctype == "__begin__":
|
||||||
self.cloud_config_str=""
|
self.cloud_config_str=""
|
||||||
return
|
return
|
||||||
@ -418,7 +408,7 @@ class CloudInit:
|
|||||||
|
|
||||||
self.cloud_config_str+="\n#%s\n%s" % (filename,payload)
|
self.cloud_config_str+="\n#%s\n%s" % (filename,payload)
|
||||||
|
|
||||||
def handle_cloud_boothook(self,data,ctype,filename,payload):
|
def handle_cloud_boothook(self,data,ctype,filename,payload, frequency):
|
||||||
if ctype == "__end__": return
|
if ctype == "__end__": return
|
||||||
if ctype == "__begin__": return
|
if ctype == "__begin__": return
|
||||||
|
|
||||||
@ -520,3 +510,81 @@ class DataSourceNotFoundException(Exception):
|
|||||||
|
|
||||||
def list_sources(cfg_list, depends):
|
def list_sources(cfg_list, depends):
|
||||||
return(DataSource.list_sources(cfg_list,depends, ["cloudinit", "" ]))
|
return(DataSource.list_sources(cfg_list,depends, ["cloudinit", "" ]))
|
||||||
|
|
||||||
|
def handler_register(mod, part_handlers, data, frequency=per_instance):
|
||||||
|
if not hasattr(mod, "handler_version"):
|
||||||
|
setattr(mod, "handler_version", 1)
|
||||||
|
|
||||||
|
for mtype in mod.list_types():
|
||||||
|
part_handlers[mtype] = mod
|
||||||
|
|
||||||
|
handler_call_begin(mod, data, frequency)
|
||||||
|
return(mod)
|
||||||
|
|
||||||
|
def handler_call_begin(mod, data, frequency):
|
||||||
|
handler_handle_part(mod, data, "__begin__", None, None, frequency)
|
||||||
|
|
||||||
|
def handler_call_end(mod, data, frequency):
|
||||||
|
handler_handle_part(mod, data, "__end__", None, None, frequency)
|
||||||
|
|
||||||
|
def handler_handle_part(mod, data, ctype, filename, payload, frequency):
|
||||||
|
# only add the handler if the module should run
|
||||||
|
modfreq = getattr(mod, "frequency", per_instance)
|
||||||
|
if not ( modfreq == per_always or
|
||||||
|
( frequency == per_instance and modfreq == per_instance)):
|
||||||
|
return
|
||||||
|
if mod.handler_version == 1:
|
||||||
|
mod.handle_part(data, ctype, filename, payload)
|
||||||
|
else:
|
||||||
|
mod.handle_part(data, ctype, filename, payload, frequency)
|
||||||
|
|
||||||
|
def partwalker_handle_handler(pdata, ctype, filename, payload):
|
||||||
|
|
||||||
|
curcount = pdata['handlercount']
|
||||||
|
modname = 'part-handler-%03d' % curcount
|
||||||
|
frequency = pdata['frequency']
|
||||||
|
|
||||||
|
modfname = modname + ".py"
|
||||||
|
util.write_file("%s/%s" % (pdata['handlerdir'], modfname), payload, 0600)
|
||||||
|
|
||||||
|
pdata['handlercount'] = curcount + 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
mod = __import__(modname)
|
||||||
|
handler_register(mod, pdata['handlers'], pdata['data'], frequency)
|
||||||
|
except:
|
||||||
|
util.logexc(log)
|
||||||
|
traceback.print_exc(file=sys.stderr)
|
||||||
|
return
|
||||||
|
|
||||||
|
def partwalker_callback(pdata, ctype, filename, payload):
|
||||||
|
# data here is the part_handlers array and then the data to pass through
|
||||||
|
if ctype == "text/part-handler":
|
||||||
|
if 'handlercount' not in pdata:
|
||||||
|
pdata['handlercount'] = 0
|
||||||
|
partwalker_handle_handler(pdata, ctype, filename, payload)
|
||||||
|
return
|
||||||
|
if ctype not in pdata['handlers']:
|
||||||
|
return
|
||||||
|
handler_handle_part(pdata['handlers'][ctype], pdata['data'],
|
||||||
|
ctype, filename, payload, pdata['frequency'])
|
||||||
|
|
||||||
|
class InternalPartHandler:
|
||||||
|
freq = per_instance
|
||||||
|
mtypes = [ ]
|
||||||
|
handler_version = 1
|
||||||
|
handler = None
|
||||||
|
def __init__(self, handler, mtypes, frequency, version = 2):
|
||||||
|
self.handler = handler
|
||||||
|
self.mtypes = mtypes
|
||||||
|
self.frequency = frequency
|
||||||
|
self.handler_version = version
|
||||||
|
|
||||||
|
def __repr__():
|
||||||
|
return("InternalPartHandler: [%s]" % self.mtypes)
|
||||||
|
|
||||||
|
def list_types(self):
|
||||||
|
return(self.mtypes)
|
||||||
|
|
||||||
|
def handle_part(self, data, ctype, filename, payload, frequency):
|
||||||
|
return(self.handler(data, ctype, filename, payload, frequency))
|
||||||
|
Loading…
x
Reference in New Issue
Block a user