1# -*- coding: utf-8 -*-
2#
3# Copyright 2015 Michael Lotz
4# Copyright 2016 Jerome Duval
5# Copyright 2017-2020 Haiku, Inc. All rights reserved.
6# Distributed under the terms of the MIT License.
7
8# -- Modules ------------------------------------------------------------------
9
10import json
11import logging
12import os
13import subprocess
14import threading
15import time
16
17from .Builders.Builder import BuilderState
18from .Builders.LocalBuilder import LocalBuilder
19from .Builders.RemoteBuilderSSH import RemoteBuilderSSH
20from .Configuration import Configuration
21from .Options import getOption
22from .Port import Port
23from .ReporterJson import ReporterJson
24from .ReporterMongo import ReporterMongo
25from .Utils import ensureCommandIsAvailable, info, sysExit, warn
26
27
28class ThreadFilter(object):
29	def __init__(self):
30		self.ident = threading.current_thread().ident
31		self.build = None
32
33	def reset(self):
34		self.ident = threading.current_thread().ident
35	def setBuild(self, build):
36		self.build = build
37
38	def filter(self, record):
39		ours = threading.current_thread().ident == self.ident
40		if ours and self.build:
41			self.build['lines'] += 1
42		return ours
43
44
45class ScheduledBuild(object):
46	def __init__(self, port, portsTreePath, requiredPackageIDs, packagesPath,
47		presentDependencyPackages):
48		self.port = port
49		self.recipeFilePath \
50			= os.path.relpath(port.recipeFilePath, portsTreePath)
51		self.resultingPackages \
52			= [package.hpkgName for package in self.port.packages]
53		self.packagesPath = packagesPath
54		self.requiredPackages = presentDependencyPackages
55		self.requiredPackageIDs = [
56			os.path.basename(path) for path in presentDependencyPackages]
57		self.missingPackageIDs = set(requiredPackageIDs)
58		self.buildNumbers = []
59		self.lost = False
60
61	@property
62	def buildable(self):
63		return len(self.missingPackageIDs) == 0
64
65	def packageCompleted(self, package, available):
66		packageID = package.versionedName
67		if packageID in self.missingPackageIDs:
68			if available:
69				self.missingPackageIDs.remove(packageID)
70				self.requiredPackageIDs.append(package.hpkgName)
71				self.requiredPackages.append(
72					os.path.join(self.packagesPath, package.hpkgName))
73			else:
74				self.lost = True
75
76	@property
77	def status(self):
78		return {
79			'port': {
80				'name': self.port.name,
81				'version': self.port.version,
82				'revision': self.port.revision,
83				'revisionedName': self.port.revisionedName,
84				'recipeFilePath': self.recipeFilePath
85			},
86			'resultingPackages': self.resultingPackages,
87			'requiredPackages': sorted(list(self.requiredPackageIDs)),
88			'missingPackageIDs': sorted(list(self.missingPackageIDs)),
89			'buildable': self.buildable,
90			'buildNumbers': self.buildNumbers,
91			'lost': self.lost
92		}
93
94
95class SkippedBuild(object):
96	def __init__(self, portsTreePath, port, reason):
97		if isinstance(port, Port):
98			self.port = port
99			self.recipeFilePath \
100				= os.path.relpath(port.recipeFilePath, portsTreePath)
101			self.resultingPackages \
102				= [package.hpkgName for package in port.packages]
103		else:
104			self.port = None
105			self.name = port
106			self.recipeFilePath = ''
107			self.resultingPackages = []
108
109		self.reason = reason
110
111	@property
112	def status(self):
113		return {
114			'port': {
115				'name': self.port.name if self.port else self.name,
116				'version': self.port.version if self.port else '',
117				'revision': self.port.revision if self.port else '',
118				'revisionedName': \
119					self.port.revisionedName if self.port else self.name,
120				'recipeFilePath': self.recipeFilePath
121			},
122			'resultingPackages': self.resultingPackages,
123			'reason': self.reason
124		}
125
126
127class BuildRecord(object):
128	def __init__(self, scheduledBuild, startTime, buildSuccess, builderId):
129		self.port = scheduledBuild.port
130		self.buildNumbers = scheduledBuild.buildNumbers
131		self.startTime = startTime
132		self.duration = time.time() - startTime
133		self.buildSuccess = buildSuccess
134		self.builderId = builderId
135
136	@property
137	def status(self):
138		return {
139			'port': {
140				'name': self.port.name,
141				'version': self.port.version,
142				'revision': self.port.revision,
143				'revisionedName': self.port.revisionedName
144			},
145			'buildNumbers': self.buildNumbers,
146			'startTime': self.startTime,
147			'duration': self.duration,
148			'buildSuccess': self.buildSuccess,
149			'builderId': self.builderId
150		}
151
152
153class BuildMaster(object):
154	def __init__(self, portsTreePath, packagesPath, options):
155		self.portsTreePath = portsTreePath
156		self._fillPortsTreeInfo()
157
158		self.activeBuilders = []
159		self.reconnectingBuilders = []
160		self.lostBuilders = []
161		self.availableBuilders = []
162		self.packagesPath = packagesPath
163		self.masterBaseDir = os.path.realpath('buildmaster')
164		self.builderBaseDir = os.path.join(self.masterBaseDir, 'builders')
165		self.buildOutputBaseDir = getOption('buildMasterOutputDir')
166		if self.buildOutputBaseDir:
167			self.buildOutputBaseDir = os.path.realpath(self.buildOutputBaseDir)
168		else:
169			self.buildOutputBaseDir = os.path.join(self.masterBaseDir, 'output')
170
171		if not os.path.isdir(self.buildOutputBaseDir):
172			os.makedirs(self.buildOutputBaseDir)
173
174		self.buildRecordsDir = os.path.join(self.buildOutputBaseDir, 'records')
175		if not os.path.isdir(self.buildRecordsDir):
176			os.makedirs(self.buildRecordsDir)
177
178		self.buildStatus = None
179		self.buildNumberFile = os.path.join(self.masterBaseDir, 'buildnumber')
180		self.buildNumber = 0
181		try:
182			with open(self.buildNumberFile, 'r') as buildNumberFile:
183				self.buildNumber = int(buildNumberFile.read())
184		except Exception as exception:
185			pass
186
187		# Prevents the 'haiku package requirement not met' error
188		# These system packages are uploaded to every builder and used as the
189		# base set of packages for builds
190		if not getOption('systemPackagesDirectory'):
191			raise Exception('Error: Must provide --system-packages-directory flag in build-master'
192				' mode for global builder package solving')
193
194		self.localBuilders = getOption('localBuilders')
195		self.remoteAvailable = False
196
197		print('Local builders count: ' + str(self.localBuilders))
198
199		logHandler = logging.FileHandler(
200			os.path.join(self.buildOutputBaseDir, 'master.log'))
201		logHandler.setFormatter(logging.Formatter('%(asctime)s: %(message)s'))
202
203		self.logger = logging.getLogger('buildMaster')
204		self.logger.setLevel(logging.DEBUG)
205		self.logger.addHandler(logHandler)
206
207		self.logger.info('portstree head is at ' + self.portsTreeHead)
208
209		# Setup our reporting engine
210		self.reporter = None
211		reportURI = Configuration.getReportingURI()
212		if reportURI == None:
213			reportFile = os.path.join(self.buildOutputBaseDir, 'status.json')
214			info("Reporting to " + reportFile)
215			self.reporter = ReporterJson(reportFile, "master",
216				Configuration.getTargetArchitecture())
217			if not self.reporter.connected():
218				sysExit('unable to setup json reporting engine')
219		elif reportURI.startswith("mongodb://"):
220			self.reporter = ReporterMongo(reportURI, "master",
221				Configuration.getTargetArchitecture())
222			if not self.reporter.connected():
223				sysExit('unable to connect to reporting engine @ ' + reportURI)
224
225		if self.localBuilders == 0:
226			for fileName in os.listdir(self.builderBaseDir):
227				configFilePath = os.path.join(self.builderBaseDir, fileName)
228				if not os.path.isfile(configFilePath):
229					continue
230
231				builder = None
232				try:
233					builder = RemoteBuilderSSH(configFilePath, packagesPath,
234						self.buildOutputBaseDir, self.portsTreeOriginURL,
235						self.portsTreeHead)
236				except Exception as exception:
237					self.logger.error('failed to add builder from config '
238						+ configFilePath + ':' + str(exception))
239					continue
240
241				self.remoteAvailable = True
242				self.activeBuilders.append(builder)
243		else:
244			logger = logging.getLogger("buildLogger")
245			for h in logger.handlers:
246				logger.removeHandler(h)
247			for i in range(0, self.localBuilders):
248				builder = None
249				try:
250					builder = LocalBuilder(str(i), packagesPath,
251						self.buildOutputBaseDir, options)
252				except Exception as exception:
253					self.logger.error('failed to add local builder: '
254						+ str(exception))
255					continue
256
257				self.activeBuilders.append(builder)
258
259		if len(self.activeBuilders) == 0:
260			sysExit('no builders available')
261
262		self.availableBuilders += self.activeBuilders
263
264		self.scheduledBuilds = []
265		self.activeBuilds = []
266		self.blockedBuilds = []
267		self.completeBuilds = []
268		self.failedBuilds = []
269		self.lostBuilds = []
270		self.skippedBuilds = []
271		self.buildHistory = []
272		self.totalBuildCount = 0
273		self.startTime = None
274		self.endTime = None
275		self.impulseData = [None] * 500
276		self.impulseIndex = -1
277		self.display = None
278
279		self.buildableCondition = threading.Condition()
280			# protectes the scheduled builds lists
281		self.builderCondition = threading.Condition()
282			# protects the builders lists
283		self.statusLock = threading.Lock()
284
285		self._setBuildStatus('preparing')
286
287	def addSkipped(self, port, reason):
288		portName = port.revisionedName if isinstance(port, Port) else port
289		warn('skipped port {}: {}'.format(portName, reason))
290
291		skippedBuild = SkippedBuild(self.portsTreePath, port, reason)
292		self.skippedBuilds.append(skippedBuild)
293		self._reportStatus()
294
295	def schedule(self, port, requiredPackageIDs, presentDependencyPackages):
296		# Skip builds that would overwrite existing packages.
297		for package in port.packages:
298			packagePath = os.path.join(self.packagesPath, package.hpkgName)
299			if not os.path.exists(packagePath):
300				continue
301
302			self.addSkipped(port, 'some packages already exist at '
303				+ self.packagesPath + ', revision bump required')
304			return
305
306		self.logger.info('scheduling build of ' + port.versionedName)
307		scheduledBuild = ScheduledBuild(port, self.portsTreePath,
308			requiredPackageIDs, self.packagesPath, presentDependencyPackages)
309
310		if scheduledBuild.buildable:
311			self.scheduledBuilds.append(scheduledBuild)
312		else:
313			self.blockedBuilds.append(scheduledBuild)
314
315		self._setBuildStatus('scheduling')
316
317	def runBuilds(self, stdscr=None):
318		# Move anything to the lost state that depends on skipped builds.
319		for skippedBuild in self.skippedBuilds:
320			if skippedBuild.port:
321				self._packagesCompleted(skippedBuild.port.packages, False)
322
323		try:
324			if stdscr:
325				from .Display import Display
326				self.display = Display(stdscr, len(self.activeBuilders))
327
328			self._ensureConsistentSchedule()
329			self.totalBuildCount = len(self.scheduledBuilds) + len(self.blockedBuilds)
330			self.startTime = time.time()
331			self._setBuildStatus('starting builds')
332			if self.display:
333				self.display.updateSummary(self.summary)
334			while True:
335				self._runBuilds()
336				self._waitForBuildsToComplete()
337				if len(self.scheduledBuilds) == 0:
338					break
339
340			exitStatus = 'complete'
341		except KeyboardInterrupt:
342			exitStatus = 'aborted'
343		except Exception as exception:
344			self.logger.error(str(exception))
345			exitStatus = 'failed: ' + str(exception)
346
347		self.logger.info('finished with status: ' + exitStatus)
348		self.endTime = time.time()
349		self._setBuildStatus(exitStatus)
350
351	def _fillPortsTreeInfo(self):
352		try:
353			ensureCommandIsAvailable('git')
354			origin = subprocess.check_output(['git', 'remote', 'get-url',
355					'origin'], cwd=self.portsTreePath, stderr=subprocess.STDOUT).decode('utf-8')
356			head = subprocess.check_output(['git', 'rev-parse', 'HEAD'],
357				cwd=self.portsTreePath, stderr=subprocess.STDOUT).decode('utf-8')
358		except:
359			warn('unable to determine origin and revision of haikuports tree')
360			origin = '<unknown> '
361			head = '<unknown> '
362
363		self.portsTreeOriginURL = origin[:-1]
364		self.portsTreeHead = head[:-1]
365
366	def _runBuilds(self):
367		while True:
368			buildToRun = None
369			with self.buildableCondition:
370				if len(self.scheduledBuilds) > 0:
371					buildToRun = self.scheduledBuilds.pop(0)
372					self.activeBuilds.append(buildToRun)
373				elif len(self.blockedBuilds) > 0:
374					if self.buildStatus != 'waiting for packages':
375						self.logger.info('nothing buildable, waiting for packages')
376					self._setBuildStatus('waiting for packages')
377					if self.display:
378						self.display.updateSummary(self.summary)
379						self.display.updateBuilders(self.status)
380					self.buildableCondition.wait(1)
381					continue
382				else:
383					break
384
385			self._runBuild(buildToRun)
386
387	def _waitForBuildsToComplete(self):
388		while True:
389			with self.builderCondition:
390				if len(self.availableBuilders) == len(self.activeBuilders):
391					break
392
393				if self.display:
394					self.display.updateSummary(self.summary)
395					self.display.updateBuilders(self.status)
396
397				self._setBuildStatus('waiting for all builds to complete')
398				self.builderCondition.wait(1)
399
400	def _getBuildNumber(self):
401		buildNumber = self.buildNumber
402		self.buildNumber += 1
403		self._persistBuildNumber()
404		return buildNumber
405
406	def _runBuild(self, scheduledBuild):
407		while True:
408			builder = None
409			buildNumber = -1
410			with self.builderCondition:
411				if len(self.activeBuilders) == 0:
412					self._setBuildStatus('all builders lost')
413					sysExit('all builders lost')
414
415				if len(self.availableBuilders) == 0:
416					self._setBuildStatus('waiting for available builders')
417					if self.display:
418						self.display.updateSummary(self.summary)
419						self.display.updateBuilders(self.status)
420					self.builderCondition.wait(1)
421					continue
422
423				builder = self.availableBuilders.pop(0)
424				buildNumber = self._getBuildNumber()
425
426			threading.Thread(None, self._buildThread,
427				'build ' + str(buildNumber),
428				(builder, scheduledBuild, buildNumber)).start()
429			break
430
431	def _persistBuildNumber(self):
432		with open(self.buildNumberFile, 'w') as buildNumberFile:
433			buildNumberFile.write(str(self.buildNumber))
434
435	def _packagesCompleted(self, packages, available):
436		completePackages = [] + packages
437		with self.buildableCondition:
438			notify = False
439
440			while len(completePackages) > 0:
441				package = completePackages.pop(0)
442				self.logger.info('package ' + package.versionedName + ' '
443					+ ('became available' if available else 'lost'))
444
445				stillBlockedBuilds = []
446				for blockedBuild in self.blockedBuilds:
447					blockedBuild.packageCompleted(package, available)
448					if blockedBuild.buildable or blockedBuild.lost:
449						notify = True
450						self.logger.info('scheduled build '
451							+ blockedBuild.port.versionedName + ' '
452							+ ('became buildable' if available else 'lost'))
453
454						if blockedBuild.buildable:
455							self.scheduledBuilds.append(blockedBuild)
456						else:
457							# the build was lost, propagate lost packages
458							self.lostBuilds.append(blockedBuild)
459							completePackages += blockedBuild.port.packages
460					else:
461						stillBlockedBuilds.append(blockedBuild)
462
463				self.blockedBuilds = stillBlockedBuilds
464
465			if notify:
466				self.buildableCondition.notify()
467
468	def _buildComplete(self, scheduledBuild, buildSuccess, listToUse):
469		with self.buildableCondition:
470			if scheduledBuild in self.activeBuilds:
471				self.activeBuilds.remove(scheduledBuild)
472			listToUse.append(scheduledBuild)
473
474		self._packagesCompleted(scheduledBuild.port.packages, buildSuccess)
475
476	def _buildThread(self, builder, scheduledBuild, buildNumber):
477		self.logger.info('starting build ' + str(buildNumber) + ', '
478			+ scheduledBuild.port.versionedName + ' on builder '
479			+ builder.name)
480
481		scheduledBuild.buildNumbers.append(buildNumber)
482
483		builder.setBuild(scheduledBuild, buildNumber)
484		self._reportStatus()
485		startTime = time.time()
486
487		(buildSuccess, reschedule) = builder.runBuild()
488
489		builder.unsetBuild()
490
491		self.logger.info('build ' + str(buildNumber) + ', '
492			+ scheduledBuild.port.versionedName + ' '
493			+ ('succeeded' if buildSuccess else 'failed'))
494
495		if not buildSuccess and reschedule:
496			self.logger.info('transient error, rescheduling build')
497			with self.buildableCondition:
498				self.activeBuilds.remove(scheduledBuild)
499				self.scheduledBuilds.append(scheduledBuild)
500				self.buildableCondition.notify()
501		else:
502			record = BuildRecord(scheduledBuild, startTime, buildSuccess,
503				builder.name)
504
505			with open(os.path.join(self.buildRecordsDir,
506					str(buildNumber) + '.json'), 'w') as outputFile:
507				outputFile.write(json.dumps(record.status))
508
509			self.buildHistory.append(record)
510			if self.display:
511				self.display.updateHistory(self.buildHistory)
512
513			self._buildComplete(scheduledBuild, buildSuccess,
514				self.completeBuilds if buildSuccess else self.failedBuilds)
515
516		with self.builderCondition:
517			if builder.state == BuilderState.LOST:
518				self.logger.error('builder ' + builder.name + ' lost')
519				self.activeBuilders.remove(builder)
520				self.lostBuilders.append(builder)
521			elif builder.state == BuilderState.RECONNECT:
522				self.logger.error(
523					'builder ' + builder.name + ' is reconnecting')
524				self.activeBuilders.remove(builder)
525				self.reconnectingBuilders.append(builder)
526			else:
527				self.availableBuilders.append(builder)
528
529			self.builderCondition.notify()
530
531		self._reportStatus()
532
533	def _ensureConsistentSchedule(self):
534		buildingPackagesIDs = []
535		for scheduledBuild in self.scheduledBuilds + self.blockedBuilds:
536			for package in scheduledBuild.port.packages:
537				if package.versionedName not in buildingPackagesIDs:
538					buildingPackagesIDs.append(package.versionedName)
539
540		brokenBuilds = []
541		for blockedBuild in self.blockedBuilds:
542			for missingPackageID in blockedBuild.missingPackageIDs:
543				if missingPackageID not in buildingPackagesIDs:
544					self.logger.error('missing package ' + missingPackageID
545						+ ' of blocked build ' + blockedBuild.port.versionedName
546						+ ' is not scheduled')
547					brokenBuilds.append(blockedBuild)
548					break
549
550		for brokenBuild in brokenBuilds:
551			self._buildComplete(brokenBuild, False, self.lostBuilds)
552
553		for lostBuild in self.lostBuilds:
554			if lostBuild in self.blockedBuilds:
555				self.blockedBuilds.remove(lostBuild)
556
557	@property
558	def status(self):
559		return {
560			'builds': {
561				'active': [build.status for build in self.activeBuilds],
562				'scheduled': [build.status for build in self.scheduledBuilds],
563				'blocked': [build.status for build in self.blockedBuilds],
564				'complete': [build.status for build in self.completeBuilds],
565				'failed': [build.status for build in self.failedBuilds],
566				'lost': [build.status for build in self.lostBuilds],
567				'skipped': [build.status for build in self.skippedBuilds]
568			},
569			'builders': {
570				'active': [builder.status for builder in self.activeBuilders
571						if builder.currentBuild is not None],
572				'reconnecting':
573					[builder.status for builder in self.reconnectingBuilders],
574				'idle': [builder.status for builder in self.activeBuilders
575						if builder.currentBuild is None],
576				'lost': [builder.status for builder in self.lostBuilders]
577			},
578			'nextBuildNumber': self.buildNumber,
579			'portsTreeOriginURL': self.portsTreeOriginURL,
580			'portsTreeHead': self.portsTreeHead,
581			'buildStatus': self.buildStatus,
582			'startTime': self.startTime,
583			'endTime': self.endTime
584		}
585
586	@property
587	def summary(self):
588		self.impulseIndex += 1
589		if self.impulseIndex >= len(self.impulseData):
590			self.impulseIndex = 0
591		impulseTime = (self.impulseData[self.impulseIndex]['time']
592			) if self.impulseData[self.impulseIndex] else None
593		impulsePkgCount = (self.impulseData[self.impulseIndex]['pkgCount']
594			) if self.impulseData[self.impulseIndex] else None
595		now = time.time()
596		pkgCount = len(self.completeBuilds) + len(self.failedBuilds)
597		self.impulseData[self.impulseIndex] = {
598			'time': now,
599			'pkgCount': pkgCount
600		}
601		return {
602			'builds': {
603				'active': len(self.activeBuilds),
604				'scheduled': len(self.scheduledBuilds),
605				'blocked': len(self.blockedBuilds),
606				'complete': len(self.completeBuilds),
607				'failed': len(self.failedBuilds),
608				'lost': len(self.lostBuilds),
609				'total': self.totalBuildCount
610			},
611			'builders': {
612				'active': len(self.activeBuilders),
613				'lost': len(self.lostBuilders),
614				'total': len(self.activeBuilders) + len(self.lostBuilders)
615			},
616			'duration': (now - self.startTime) if self.startTime else None,
617			'pkg_hour': int(pkgCount * 3600
618				/ (now - self.startTime)) if self.startTime else None,
619			'impulse': int((pkgCount - impulsePkgCount) * 3600
620				/ (now - impulseTime)) if impulsePkgCount else None
621		}
622
623	def _setBuildStatus(self, buildStatus):
624		self.buildStatus = buildStatus
625		self._reportStatus()
626
627	def _reportStatus(self):
628		if not self.reporter:
629			return
630		with self.statusLock:
631			self.reporter.updateBuildrun(self.buildNumber, self.status)
632