1 import re
2 import os
3 import sys
4 import time
5 import fcntl
6 import Queue
7 import json
8 import subprocess
9 import multiprocessing
10
11 import ansible
12 import ansible.utils
13 from ansible import callbacks
14 from bunch import Bunch
15 from setproctitle import setproctitle
16 from IPy import IP
17
18 import errors
19 import mockremote
20 from callback import FrontendCallback
21
22 ansible_playbook="ansible-playbook"
23
24 try:
25 import fedmsg
26 except ImportError:
27 pass
28
30 """ transform dict into --extra-vars="json string" """
31 if not extra_vars:
32 return ""
33 return "--extra-vars='{{\"{0}\": {1}}}'".format(
34 name, json.dumps(extra_vars))
35
37
38 """ playbook callbacks - quietly! """
39
43
45 callbacks.call_callback_module("playbook_on_start")
46
48 callbacks.call_callback_module("playbook_on_notify", host, handler)
49
51 callbacks.call_callback_module("playbook_on_no_hosts_matched")
52
54 callbacks.call_callback_module("playbook_on_no_hosts_remaining")
55
57 callbacks.call_callback_module(
58 "playbook_on_task_start", name, is_conditional)
59
60 - def on_vars_prompt(self, varname,
61 private=True, prompt=None, encrypt=None,
62 confirm=False, salt_size=None, salt=None):
63
64 result = None
65 sys.stderr.write(
66 "***** VARS_PROMPT WILL NOT BE RUN IN THIS KIND OF PLAYBOOK *****\n")
67
68 callbacks.call_callback_module(
69 "playbook_on_vars_prompt", varname, private=private,
70 prompt=prompt, encrypt=encrypt, confirm=confirm,
71 salt_size=salt_size, salt=None)
72
73 return result
74
76 callbacks.call_callback_module("playbook_on_setup")
77
79 callbacks.call_callback_module(
80 "playbook_on_import_for_host", host, imported_file)
81
83 callbacks.call_callback_module(
84 "playbook_on_not_import_for_host", host, missing_file)
85
87 callbacks.call_callback_module("playbook_on_play_start", pattern)
88
90 callbacks.call_callback_module("playbook_on_stats", stats)
91
92
94
96 self.logfile = logfile
97
99 if self.logfile:
100 now = time.strftime("%F %T")
101 try:
102 with open(self.logfile, 'a') as lf:
103 fcntl.flock(lf, fcntl.LOCK_EX)
104 lf.write(str(now) + ': ' + msg + '\n')
105 fcntl.flock(lf, fcntl.LOCK_UN)
106 except (IOError, OSError), e:
107 sys.stderr.write("Could not write to logfile {0} - {1}\n"
108 .format(self.logfile, str(e)))
109
110
111 -class Worker(multiprocessing.Process):
112
113 - def __init__(self, opts, jobs, events, worker_num,
114 ip=None, create=True, callback=None, lock=None):
115
116
117 multiprocessing.Process.__init__(self, name="worker-builder")
118
119
120 self.jobs = jobs
121
122 self.events = events
123 self.worker_num = worker_num
124 self.ip = ip
125 self.vm_name = None
126 self.opts = opts
127 self.kill_received = False
128 self.callback = callback
129 self.create = create
130 self.lock = lock
131 self.frontend_callback = FrontendCallback(opts)
132 if not self.callback:
133 self.logfile = os.path.join(
134 self.opts.worker_logdir,
135 "worker-{0}.log".format(self.worker_num))
136 self.callback = WorkerCallback(logfile=self.logfile)
137
138 if ip:
139 self.callback.log("creating worker: {0}".format(ip))
140 else:
141 self.callback.log("creating worker: dynamic ip")
142
143 - def event(self, topic, template, content=None):
144 """ Multi-purpose logging method.
145
146 Logs messages to two different destinations:
147 - To log file
148 - The internal "events" queue for communicating back to the
149 dispatcher.
150 - The fedmsg bus. Messages are posted asynchronously to a
151 zmq.PUB socket.
152
153 """
154
155 content = content or {}
156 what = template.format(**content)
157
158 if self.ip:
159 who = "worker-{0}-{1}".format(self.worker_num, self.ip)
160 else:
161 who = "worker-{0}".format(self.worker_num)
162
163 self.callback.log("event: who: {0}, what: {1}".format(who, what))
164 self.events.put({"when": time.time(), "who": who, "what": what})
165 try:
166 content["who"] = who
167 content["what"] = what
168 if self.opts.fedmsg_enabled:
169 fedmsg.publish(modname="copr", topic=topic, msg=content)
170
171 except Exception, e:
172
173 self.callback.log("failed to publish message: {0}".format(e))
174
176 """
177 call ansible playbook
178 - well mostly we run out of space in OpenStack so we rather try
179 multiple times (attempts param)
180 - dump any attempt failure
181 """
182
183
184
185
186 command="{0} {1}".format(ansible_playbook, args)
187
188 for i in range(0, attempts):
189 try:
190 attempt_desc = ": retry: " if i else ": begin: "
191 self.callback.log(name + attempt_desc + command)
192
193 result = subprocess.check_output(command, shell=True)
194 self.callback.log("Raw playbook output:\n{0}\n".format(result))
195 break
196
197 except subprocess.CalledProcessError as e:
198 result = None
199 self.callback.log("CalledProcessError: \n{0}\n".format(e.output))
200 sys.stderr.write("{0}\n".format(e.output))
201
202 time.sleep(self.opts.sleeptime)
203
204 self.callback.log(name + ": end")
205 return result
206
208 """call the spawn playbook to startup/provision a building instance"""
209
210 start = time.time()
211
212
213
214
215 extra_vars = {}
216 if self.opts.spawn_vars:
217 for i in self.opts.spawn_vars.split(","):
218 if i == 'chroot':
219 extra_vars['chroot'] = job['chroot']
220
221 arch = job['chroot'].split("-")[2]
222 try:
223 spawn_playbook = self.opts.spawn_playbook[arch]
224 except KeyError:
225 return None
226
227 args = "-c ssh {0} {1}".format(
228 spawn_playbook,
229 ans_extra_vars_encode(extra_vars, "copr_task"))
230
231 result = self.run_ansible_playbook(args, "spawning instance")
232 if not result:
233 return None
234
235 match = re.search(r'IP=([^\{\}"]+)', result, re.MULTILINE)
236 if not match:
237 return None
238 ipaddr = match.group(1)
239
240 match = re.search(r'vm_name=([^\{\}"]+)', result, re.MULTILINE)
241 if match:
242 self.vm_name = match.group(1)
243
244 self.callback.log("got instance ip: {0}".format(ipaddr))
245 self.callback.log(
246 "Instance spawn/provision took {0} sec".format(time.time() - start))
247
248 if self.ip:
249 return self.ip
250
251
252
253
254
255 try:
256 IP(ipaddr)
257 return ipaddr
258 except ValueError:
259
260 self.callback.log(
261 "No IP back from spawn_instance - dumping cache output")
262 self.callback.log(str(result))
263 self.callback.log("Test spawn_instance playbook manually")
264 return None
265
267 """call the terminate playbook to destroy the building instance"""
268
269 term_args = {}
270 if self.opts.terminate_vars:
271 for i in self.opts.terminate_vars.split(","):
272 if i == "ip":
273 term_args["ip"] = instance_ip
274 if i == "vm_name":
275 term_args["vm_name"] = self.vm_name
276
277 args = "-c ssh -i '{0},' {1} {2}".format(
278 instance_ip, self.opts.terminate_playbook,
279 ans_extra_vars_encode(term_args, "copr_task"))
280 self.run_ansible_playbook(args, "terminate instance")
281
282
284
285
286 try:
287 build = json.load(open(jobfile))
288 except ValueError:
289
290 return None
291 jobdata = Bunch()
292 jobdata.pkgs = build["pkgs"].split(" ")
293 jobdata.repos = [r for r in build["repos"].split(" ") if r.strip()]
294 jobdata.chroot = build["chroot"]
295 jobdata.buildroot_pkgs = build["buildroot_pkgs"]
296 jobdata.memory_reqs = build["memory_reqs"]
297 if build["timeout"]:
298 jobdata.timeout = build["timeout"]
299 else:
300 jobdata.timeout = self.opts.timeout
301 jobdata.destdir = os.path.normpath(
302 os.path.join(self.opts.destdir,
303 build["copr"]["owner"]["name"],
304 build["copr"]["name"]))
305
306 jobdata.build_id = build["id"]
307 jobdata.results = os.path.join(
308 self.opts.results_baseurl,
309 build["copr"]["owner"]["name"],
310 build["copr"]["name"] + "/")
311
312 jobdata.copr_id = build["copr"]["id"]
313 jobdata.user_id = build["user_id"]
314 jobdata.user_name = build["copr"]["owner"]["name"]
315 jobdata.copr_name = build["copr"]["name"]
316
317 jobdata.pkg_version = ""
318 jobdata.built_packages = ""
319
320 return jobdata
321
322
323 - def post_to_frontend(self, data):
324 """send data to frontend"""
325 i = 10
326 while i > 0:
327 result = self.frontend_callback.post_to_frontend(data)
328 if not result:
329 self.callback.log(self.frontend_callback.msg)
330 i -= 1
331 time.sleep(5)
332 else:
333 i = 0
334 return result
335
336
338
339 build = {"id": job.build_id,
340 "started_on": job.started_on,
341 "results": job.results,
342 "chroot": job.chroot,
343 "status": 3,
344 }
345 data = {"builds": [build]}
346
347 if not self.post_to_frontend(data):
348 raise errors.CoprWorkerError(
349 "Could not communicate to front end to submit status info")
350
351
353
354 self.callback.log(
355 "{0} status {1}. Took {2} seconds".format(
356 job.build_id, job.status, job.ended_on - job.started_on))
357
358 build = {
359 "id": job.build_id,
360 "ended_on": job.ended_on,
361 "status": job.status,
362 "chroot": job.chroot,
363 "pkg_version": job.pkg_version,
364 "built_packages": job.built_packages,
365 }
366
367 data = {"builds": [build]}
368
369 if not self.post_to_frontend(data):
370 raise errors.CoprWorkerError(
371 "Could not communicate to front end to submit results")
372
373 os.unlink(job.jobfile)
374
376 """
377 Worker should startup and check if it can function
378 for each job it takes from the jobs queue
379 run opts.setup_playbook to create the instance
380 do the build (mockremote)
381 terminate the instance.
382 """
383
384 setproctitle("worker {0}".format(self.worker_num))
385 while not self.kill_received:
386 try:
387 jobfile = self.jobs.get()
388 except Queue.Empty:
389 break
390
391
392 job = self.parse_job(jobfile)
393
394 if job is None:
395 self.callback.log(
396 'jobfile {0} is mangled, please investigate'.format(
397 jobfile))
398
399 time.sleep(self.opts.sleeptime)
400 continue
401
402
403
404
405
406 job.jobfile = jobfile
407
408
409 if self.create:
410 try:
411 ip = self.spawn_instance(job)
412 if not ip:
413 raise errors.CoprWorkerError(
414 "No IP found from creating instance")
415
416 except ansible.errors.AnsibleError, e:
417 self.callback.log(
418 "failure to setup instance: {0}".format(e))
419
420 raise
421
422 try:
423
424 try:
425 if self.opts.fedmsg_enabled:
426 fedmsg.init(
427 name="relay_inbound",
428 cert_prefix="copr",
429 active=True)
430
431 except Exception, e:
432 self.callback.log(
433 "failed to initialize fedmsg: {0}".format(e))
434
435 status = 1
436 job.started_on = time.time()
437 self.mark_started(job)
438
439 template = "build start: user:{user} copr:{copr}" \
440 " build:{build} ip:{ip} pid:{pid}"
441
442 content = dict(user=job.user_name, copr=job.copr_name,
443 build=job.build_id, ip=ip, pid=self.pid)
444 self.event("build.start", template, content)
445
446 template = "chroot start: chroot:{chroot} user:{user}" \
447 "copr:{copr} build:{build} ip:{ip} pid:{pid}"
448
449 content = dict(chroot=job.chroot, user=job.user_name,
450 copr=job.copr_name, build=job.build_id,
451 ip=ip, pid=self.pid)
452
453 self.event("chroot.start", template, content)
454
455 chroot_destdir = os.path.normpath(
456 job.destdir + '/' + job.chroot)
457
458
459 if not os.path.exists(chroot_destdir):
460 try:
461 os.makedirs(chroot_destdir)
462 except (OSError, IOError), e:
463 msg = "Could not make results dir" \
464 " for job: {0} - {1}".format(chroot_destdir,
465 str(e))
466
467 self.callback.log(msg)
468 status = 0
469
470 if status == 1:
471
472
473
474
475
476
477
478 self.callback.log("Starting build: id={0} builder={1}"
479 " timeout={2} destdir={3}"
480 " chroot={4} repos={5}".format(
481 job.build_id, ip,
482 job.timeout, job.destdir,
483 job.chroot, str(job.repos)))
484
485 self.callback.log("building pkgs: {0}".format(
486 ' '.join(job.pkgs)))
487
488 try:
489 chroot_repos = list(job.repos)
490 chroot_repos.append(job.results + '/' + job.chroot)
491 chrootlogfile = "{0}/build-{1}.log".format(
492 chroot_destdir, job.build_id)
493
494 macros = {
495 "copr_username": job.user_name,
496 "copr_projectname": job.copr_name,
497 "vendor": "Fedora Project COPR ({0}/{1})".format(
498 job.user_name, job.copr_name)
499 }
500
501 mr = mockremote.MockRemote(
502 builder=ip,
503 timeout=job.timeout,
504 destdir=job.destdir,
505 chroot=job.chroot,
506 cont=True,
507 recurse=True,
508 repos=chroot_repos,
509 macros=macros,
510 lock=self.lock,
511 buildroot_pkgs=job.buildroot_pkgs,
512 callback=mockremote.CliLogCallBack(
513 quiet=True, logfn=chrootlogfile))
514
515 skipped, build_details = mr.build_pkgs(job.pkgs)
516
517 if skipped:
518 status = 5
519
520 job.update(build_details)
521
522 except mockremote.MockRemoteError, e:
523
524 self.callback.log("{0} - {1}".format(ip, e))
525 status = 0
526 else:
527
528
529 if mr.failed:
530 status = 0
531
532 self.callback.log(
533 "Finished build: id={0} builder={1}"
534 " timeout={2} destdir={3}"
535 " chroot={4} repos={5}".format(
536 job.build_id, ip,
537 job.timeout, job.destdir,
538 job.chroot, str(job.repos)))
539
540 job.ended_on = time.time()
541
542 job.status = status
543 self.return_results(job)
544 self.callback.log("worker finished build: {0}".format(ip))
545 template = "build end: user:{user} copr:{copr} build:{build}" \
546 " ip:{ip} pid:{pid} status:{status}"
547
548 content = dict(user=job.user_name, copr=job.copr_name,
549 build=job.build_id, ip=ip, pid=self.pid,
550 status=job.status, chroot=job.chroot)
551 self.event("build.end", template, content)
552
553 finally:
554
555 if self.create:
556 self.terminate_instance(ip)
557