Package backend :: Module dispatcher
[hide private]
[frames] | no frames]

Source Code for Module backend.dispatcher

  1  import re 
  2  import os 
  3  import sys 
  4  import time 
  5  import fcntl 
  6  import Queue 
  7  import json 
  8  import subprocess 
  9  import multiprocessing 
 10   
 11  import ansible 
 12  import ansible.utils 
 13  from ansible import callbacks 
 14  from bunch import Bunch 
 15  from setproctitle import setproctitle 
 16  from IPy import IP 
 17   
 18  import errors 
 19  import mockremote 
 20  from callback import FrontendCallback 
 21   
 22  ansible_playbook="ansible-playbook" 
 23   
 24  try: 
 25      import fedmsg 
 26  except ImportError: 
 27      pass  # fedmsg is optional 
 28   
29 -def ans_extra_vars_encode(extra_vars, name):
30 """ transform dict into --extra-vars="json string" """ 31 if not extra_vars: 32 return "" 33 return "--extra-vars='{{\"{0}\": {1}}}'".format( 34 name, json.dumps(extra_vars))
35
36 -class SilentPlaybookCallbacks(callbacks.PlaybookCallbacks):
37 38 """ playbook callbacks - quietly! """ 39
40 - def __init__(self, verbose=False):
41 super(SilentPlaybookCallbacks, self).__init__() 42 self.verbose = verbose
43
44 - def on_start(self):
45 callbacks.call_callback_module("playbook_on_start")
46
47 - def on_notify(self, host, handler):
48 callbacks.call_callback_module("playbook_on_notify", host, handler)
49
50 - def on_no_hosts_matched(self):
51 callbacks.call_callback_module("playbook_on_no_hosts_matched")
52
53 - def on_no_hosts_remaining(self):
54 callbacks.call_callback_module("playbook_on_no_hosts_remaining")
55
56 - def on_task_start(self, name, is_conditional):
57 callbacks.call_callback_module( 58 "playbook_on_task_start", name, is_conditional)
59
60 - def on_vars_prompt(self, varname, 61 private=True, prompt=None, encrypt=None, 62 confirm=False, salt_size=None, salt=None):
63 64 result = None 65 sys.stderr.write( 66 "***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****\n") 67 68 callbacks.call_callback_module( 69 "playbook_on_vars_prompt", varname, private=private, 70 prompt=prompt, encrypt=encrypt, confirm=confirm, 71 salt_size=salt_size, salt=None) 72 73 return result
74
75 - def on_setup(self):
76 callbacks.call_callback_module("playbook_on_setup")
77
78 - def on_import_for_host(self, host, imported_file):
79 callbacks.call_callback_module( 80 "playbook_on_import_for_host", host, imported_file)
81
82 - def on_not_import_for_host(self, host, missing_file):
83 callbacks.call_callback_module( 84 "playbook_on_not_import_for_host", host, missing_file)
85
86 - def on_play_start(self, pattern):
87 callbacks.call_callback_module("playbook_on_play_start", pattern)
88
89 - def on_stats(self, stats):
90 callbacks.call_callback_module("playbook_on_stats", stats)
91 92
93 -class WorkerCallback(object):
94
95 - def __init__(self, logfile=None):
96 self.logfile = logfile
97
98 - def log(self, msg):
99 if self.logfile: 100 now = time.strftime("%F %T") 101 try: 102 with open(self.logfile, 'a') as lf: 103 fcntl.flock(lf, fcntl.LOCK_EX) 104 lf.write(str(now) + ': ' + msg + '\n') 105 fcntl.flock(lf, fcntl.LOCK_UN) 106 except (IOError, OSError), e: 107 sys.stderr.write("Could not write to logfile {0} - {1}\n" 108 .format(self.logfile, str(e)))
109 110
111 -class Worker(multiprocessing.Process):
112
113 - def __init__(self, opts, jobs, events, worker_num, 114 ip=None, create=True, callback=None, lock=None):
115 116 # base class initialization 117 multiprocessing.Process.__init__(self, name="worker-builder") 118 119 # job management stuff 120 self.jobs = jobs 121 # event queue for communicating back to dispatcher 122 self.events = events 123 self.worker_num = worker_num 124 self.ip = ip 125 self.vm_name = None 126 self.opts = opts 127 self.kill_received = False 128 self.callback = callback 129 self.create = create 130 self.lock = lock 131 self.frontend_callback = FrontendCallback(opts) 132 if not self.callback: 133 self.logfile = os.path.join( 134 self.opts.worker_logdir, 135 "worker-{0}.log".format(self.worker_num)) 136 self.callback = WorkerCallback(logfile=self.logfile) 137 138 if ip: 139 self.callback.log("creating worker: {0}".format(ip)) 140 else: 141 self.callback.log("creating worker: dynamic ip")
142
143 - def event(self, topic, template, content=None):
144 """ Multi-purpose logging method. 145 146 Logs messages to two different destinations: 147 - To log file 148 - The internal "events" queue for communicating back to the 149 dispatcher. 150 - The fedmsg bus. Messages are posted asynchronously to a 151 zmq.PUB socket. 152 153 """ 154 155 content = content or {} 156 what = template.format(**content) 157 158 if self.ip: 159 who = "worker-{0}-{1}".format(self.worker_num, self.ip) 160 else: 161 who = "worker-{0}".format(self.worker_num) 162 163 self.callback.log("event: who: {0}, what: {1}".format(who, what)) 164 self.events.put({"when": time.time(), "who": who, "what": what}) 165 try: 166 content["who"] = who 167 content["what"] = what 168 if self.opts.fedmsg_enabled: 169 fedmsg.publish(modname="copr", topic=topic, msg=content) 170 # pylint: disable=W0703 171 except Exception, e: 172 # XXX - Maybe log traceback as well with traceback.format_exc() 173 self.callback.log("failed to publish message: {0}".format(e))
174
175 - def run_ansible_playbook(self, args, name="running playbook", attempts=3):
176 """ 177 call ansible playbook 178 - well mostly we run out of space in OpenStack so we rather try 179 multiple times (attempts param) 180 - dump any attempt failure 181 """ 182 183 # Ansible playbook python API does not work here, dunno why. See: 184 # https://groups.google.com/forum/#!topic/ansible-project/DNBD2oHv5k8 185 186 command="{0} {1}".format(ansible_playbook, args) 187 188 for i in range(0, attempts): 189 try: 190 attempt_desc = ": retry: " if i else ": begin: " 191 self.callback.log(name + attempt_desc + command) 192 193 result = subprocess.check_output(command, shell=True) 194 self.callback.log("Raw playbook output:\n{0}\n".format(result)) 195 break 196 197 except subprocess.CalledProcessError as e: 198 result = None 199 self.callback.log("CalledProcessError: \n{0}\n".format(e.output)) 200 sys.stderr.write("{0}\n".format(e.output)) 201 # FIXME: this is not purpose of opts.sleeptime 202 time.sleep(self.opts.sleeptime) 203 204 self.callback.log(name + ": end") 205 return result
206
207 - def spawn_instance(self, job):
208 """call the spawn playbook to startup/provision a building instance""" 209 210 start = time.time() 211 212 # Ansible playbook python API does not work here, dunno why. See: 213 # https://groups.google.com/forum/#!topic/ansible-project/DNBD2oHv5k8 214 215 extra_vars = {} 216 if self.opts.spawn_vars: 217 for i in self.opts.spawn_vars.split(","): 218 if i == 'chroot': 219 extra_vars['chroot'] = job['chroot'] 220 221 arch = job['chroot'].split("-")[2] 222 try: 223 spawn_playbook = self.opts.spawn_playbook[arch] 224 except KeyError: 225 return None 226 227 args = "-c ssh {0} {1}".format( 228 spawn_playbook, 229 ans_extra_vars_encode(extra_vars, "copr_task")) 230 231 result = self.run_ansible_playbook(args, "spawning instance") 232 if not result: 233 return None 234 235 match = re.search(r'IP=([^\{\}"]+)', result, re.MULTILINE) 236 if not match: 237 return None 238 ipaddr = match.group(1) 239 240 match = re.search(r'vm_name=([^\{\}"]+)', result, re.MULTILINE) 241 if match: 242 self.vm_name = match.group(1) 243 244 self.callback.log("got instance ip: {0}".format(ipaddr)) 245 self.callback.log( 246 "Instance spawn/provision took {0} sec".format(time.time() - start)) 247 248 if self.ip: 249 return self.ip 250 251 # for i in play.SETUP_CACHE: 252 # if i =="localhost": 253 # continue 254 # return i 255 try: 256 IP(ipaddr) 257 return ipaddr 258 except ValueError: 259 # if we get here we"re in trouble 260 self.callback.log( 261 "No IP back from spawn_instance - dumping cache output") 262 self.callback.log(str(result)) 263 self.callback.log("Test spawn_instance playbook manually") 264 return None
265
266 - def terminate_instance(self, instance_ip):
267 """call the terminate playbook to destroy the building instance""" 268 269 term_args = {} 270 if self.opts.terminate_vars: 271 for i in self.opts.terminate_vars.split(","): 272 if i == "ip": 273 term_args["ip"] = instance_ip 274 if i == "vm_name": 275 term_args["vm_name"] = self.vm_name 276 277 args = "-c ssh -i '{0},' {1} {2}".format( 278 instance_ip, self.opts.terminate_playbook, 279 ans_extra_vars_encode(term_args, "copr_task")) 280 self.run_ansible_playbook(args, "terminate instance")
281 282
283 - def parse_job(self, jobfile):
284 # read the json of the job in 285 # break out what we need return a bunch of the info we need 286 try: 287 build = json.load(open(jobfile)) 288 except ValueError: 289 # empty file? 290 return None 291 jobdata = Bunch() 292 jobdata.pkgs = build["pkgs"].split(" ") 293 jobdata.repos = [r for r in build["repos"].split(" ") if r.strip()] 294 jobdata.chroot = build["chroot"] 295 jobdata.buildroot_pkgs = build["buildroot_pkgs"] 296 jobdata.memory_reqs = build["memory_reqs"] 297 if build["timeout"]: 298 jobdata.timeout = build["timeout"] 299 else: 300 jobdata.timeout = self.opts.timeout 301 jobdata.destdir = os.path.normpath( 302 os.path.join(self.opts.destdir, 303 build["copr"]["owner"]["name"], 304 build["copr"]["name"])) 305 306 jobdata.build_id = build["id"] 307 jobdata.results = os.path.join( 308 self.opts.results_baseurl, 309 build["copr"]["owner"]["name"], 310 build["copr"]["name"] + "/") 311 312 jobdata.copr_id = build["copr"]["id"] 313 jobdata.user_id = build["user_id"] 314 jobdata.user_name = build["copr"]["owner"]["name"] 315 jobdata.copr_name = build["copr"]["name"] 316 317 jobdata.pkg_version = "" 318 jobdata.built_packages = "" 319 320 return jobdata
321 322 # maybe we move this to the callback?
323 - def post_to_frontend(self, data):
324 """send data to frontend""" 325 i = 10 326 while i > 0: 327 result = self.frontend_callback.post_to_frontend(data) 328 if not result: 329 self.callback.log(self.frontend_callback.msg) 330 i -= 1 331 time.sleep(5) 332 else: 333 i = 0 334 return result
335 336 # maybe we move this to the callback?
337 - def mark_started(self, job):
338 339 build = {"id": job.build_id, 340 "started_on": job.started_on, 341 "results": job.results, 342 "chroot": job.chroot, 343 "status": 3, # running 344 } 345 data = {"builds": [build]} 346 347 if not self.post_to_frontend(data): 348 raise errors.CoprWorkerError( 349 "Could not communicate to front end to submit status info")
350 351 # maybe we move this to the callback?
352 - def return_results(self, job):
353 354 self.callback.log( 355 "{0} status {1}. Took {2} seconds".format( 356 job.build_id, job.status, job.ended_on - job.started_on)) 357 358 build = { 359 "id": job.build_id, 360 "ended_on": job.ended_on, 361 "status": job.status, 362 "chroot": job.chroot, 363 "pkg_version": job.pkg_version, 364 "built_packages": job.built_packages, 365 } 366 367 data = {"builds": [build]} 368 369 if not self.post_to_frontend(data): 370 raise errors.CoprWorkerError( 371 "Could not communicate to front end to submit results") 372 373 os.unlink(job.jobfile)
374
375 - def run(self):
376 """ 377 Worker should startup and check if it can function 378 for each job it takes from the jobs queue 379 run opts.setup_playbook to create the instance 380 do the build (mockremote) 381 terminate the instance. 382 """ 383 384 setproctitle("worker {0}".format(self.worker_num)) 385 while not self.kill_received: 386 try: 387 jobfile = self.jobs.get() 388 except Queue.Empty: 389 break 390 391 # parse the job json into our info 392 job = self.parse_job(jobfile) 393 394 if job is None: 395 self.callback.log( 396 'jobfile {0} is mangled, please investigate'.format( 397 jobfile)) 398 399 time.sleep(self.opts.sleeptime) 400 continue 401 402 # FIXME 403 # this is our best place to sanity check the job before starting 404 # up any longer process 405 406 job.jobfile = jobfile 407 408 # spin up our build instance 409 if self.create: 410 try: 411 ip = self.spawn_instance(job) 412 if not ip: 413 raise errors.CoprWorkerError( 414 "No IP found from creating instance") 415 416 except ansible.errors.AnsibleError, e: 417 self.callback.log( 418 "failure to setup instance: {0}".format(e)) 419 420 raise 421 422 try: 423 # This assumes there are certs and a fedmsg config on disk 424 try: 425 if self.opts.fedmsg_enabled: 426 fedmsg.init( 427 name="relay_inbound", 428 cert_prefix="copr", 429 active=True) 430 431 except Exception, e: 432 self.callback.log( 433 "failed to initialize fedmsg: {0}".format(e)) 434 435 status = 1 # succeeded 436 job.started_on = time.time() 437 self.mark_started(job) 438 439 template = "build start: user:{user} copr:{copr}" \ 440 " build:{build} ip:{ip} pid:{pid}" 441 442 content = dict(user=job.user_name, copr=job.copr_name, 443 build=job.build_id, ip=ip, pid=self.pid) 444 self.event("build.start", template, content) 445 446 template = "chroot start: chroot:{chroot} user:{user}" \ 447 "copr:{copr} build:{build} ip:{ip} pid:{pid}" 448 449 content = dict(chroot=job.chroot, user=job.user_name, 450 copr=job.copr_name, build=job.build_id, 451 ip=ip, pid=self.pid) 452 453 self.event("chroot.start", template, content) 454 455 chroot_destdir = os.path.normpath( 456 job.destdir + '/' + job.chroot) 457 458 # setup our target dir locally 459 if not os.path.exists(chroot_destdir): 460 try: 461 os.makedirs(chroot_destdir) 462 except (OSError, IOError), e: 463 msg = "Could not make results dir" \ 464 " for job: {0} - {1}".format(chroot_destdir, 465 str(e)) 466 467 self.callback.log(msg) 468 status = 0 # fail 469 470 if status == 1: # succeeded 471 # FIXME 472 # need a plugin hook or some mechanism to check random 473 # info about the pkgs 474 # this should use ansible to download the pkg on the remote system 475 # and run a series of checks on the package before we 476 # start the build - most importantly license checks. 477 478 self.callback.log("Starting build: id={0} builder={1}" 479 " timeout={2} destdir={3}" 480 " chroot={4} repos={5}".format( 481 job.build_id, ip, 482 job.timeout, job.destdir, 483 job.chroot, str(job.repos))) 484 485 self.callback.log("building pkgs: {0}".format( 486 ' '.join(job.pkgs))) 487 488 try: 489 chroot_repos = list(job.repos) 490 chroot_repos.append(job.results + '/' + job.chroot) 491 chrootlogfile = "{0}/build-{1}.log".format( 492 chroot_destdir, job.build_id) 493 494 macros = { 495 "copr_username": job.user_name, 496 "copr_projectname": job.copr_name, 497 "vendor": "Fedora Project COPR ({0}/{1})".format( 498 job.user_name, job.copr_name) 499 } 500 501 mr = mockremote.MockRemote( 502 builder=ip, 503 timeout=job.timeout, 504 destdir=job.destdir, 505 chroot=job.chroot, 506 cont=True, 507 recurse=True, 508 repos=chroot_repos, 509 macros=macros, 510 lock=self.lock, 511 buildroot_pkgs=job.buildroot_pkgs, 512 callback=mockremote.CliLogCallBack( 513 quiet=True, logfn=chrootlogfile)) 514 515 skipped, build_details = mr.build_pkgs(job.pkgs) 516 517 if skipped: 518 status = 5 # skipped 519 520 job.update(build_details) 521 522 except mockremote.MockRemoteError, e: 523 # record and break 524 self.callback.log("{0} - {1}".format(ip, e)) 525 status = 0 # failure 526 else: 527 # we can"t really trace back if we just fail normally 528 # check if any pkgs didn"t build 529 if mr.failed: 530 status = 0 # failure 531 532 self.callback.log( 533 "Finished build: id={0} builder={1}" 534 " timeout={2} destdir={3}" 535 " chroot={4} repos={5}".format( 536 job.build_id, ip, 537 job.timeout, job.destdir, 538 job.chroot, str(job.repos))) 539 540 job.ended_on = time.time() 541 542 job.status = status 543 self.return_results(job) 544 self.callback.log("worker finished build: {0}".format(ip)) 545 template = "build end: user:{user} copr:{copr} build:{build}" \ 546 " ip:{ip} pid:{pid} status:{status}" 547 548 content = dict(user=job.user_name, copr=job.copr_name, 549 build=job.build_id, ip=ip, pid=self.pid, 550 status=job.status, chroot=job.chroot) 551 self.event("build.end", template, content) 552 553 finally: 554 # clean up the instance 555 if self.create: 556 self.terminate_instance(ip)
557