共计 12671 个字符,预计需要花费 32 分钟才能阅读完成。
原文参考地址a-continuous-integration-system,本文基本上跟翻译差不多,当然也会解释下其工作原理,英文看得不爽就看看这篇
原来给了一个git管理代码仓库的例子,在测试代码仓库出现任何commit的时候会自动更新代码并执行相应的测试,这样的话可以实现自动化管理,减少了人力操作
实现步骤:
(1)初始化
初始化仓库test_repo:
$ mkdir test_repo
$ cd test_repo
$ git init
test_repo是我们master分支上的代码仓库,现在要copy几个文件到当前的仓库中并且commit
$ cp -r /this/directory/tests /path/to/test_repo/
$ cd /path/to/test\_repo
$ git add tests/
$ git commit -m ”add tests”
test_repo_clone_obs仓库是为了监控master分支数据建立的中间仓库:
$ git clone /path/to/test_repo test_repo_clone_obs
test_repo_clone_runner
是为了测试,假设当前的master分支出现新的提交,则自动同步数据然后执行相应的测试软件,所以在此之前需要同步一下master分支上的代码:
$ git clone /path/to/test_repo test_repo_clone_runner
实时获取当前代码仓库的情况代码如下
import argparse
import os
import re
import socket
import SocketServer
import subprocess
import sys
import time
import helpers
def poll():
parser = argparse.ArgumentParser()
parser.add_argument("--dispatcher-server",
help="dispatcher host:port, " \
"by default it uses localhost:8888",
default="localhost:8888",
action="store")
parser.add_argument("repo", metavar="REPO", type=str,
help="~/test_repo")
args = parser.parse_args()
print args
dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")
while True:
try:
# call the bash script that will update the repo and check
# for changes. If there's a change, it will drop a .commit_id file
# with the latest commit in the current working directory
subprocess.check_output(["./update_repo.sh", args.repo])
except subprocess.CalledProcessError as e:
raise Exception("Could not update and check repository. Reason: %s" % e.output)
parse是为了解析命令行参数,具体你可以查看上一篇文章python argparse用法
总共两个参数一个时端口,另外一个是master代码仓库地址
然后开启子进程执行update_repo.sh脚本,该脚本的代码如下
#!/bin/bash
source run_or_fail.sh
# delete previous id
rm -f .commit_id
# go to repo and update it to given commit
run_or_fail "Repository folder not found!" pushd 1 1> /dev/null
run_or_fail "Could not reset git" git reset --hard HEAD
# get the most recent commit
COMMIT=(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ ? != 0 ]; then
echo "Could not call 'git log' on repository"
exit 1
fi
# get its id
COMMIT_ID=`echoCOMMIT | awk '{ print 2 }'`
# update the repo
run_or_fail "Could not pull from repository" git pull
# get the most recent commit
COMMIT=(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ ? != 0 ]; then
echo "Could not call 'git log' on repository"
exit 1
fi
# get its id
NEW_COMMIT_ID=`echoCOMMIT | awk '{ print 2 }'`
# if the id changed, then write it to a file
if [NEW_COMMIT_ID != COMMIT_ID ]; then
popd 1> /dev/null
echoNEW_COMMIT_ID > .commit_id
fi
监听的方式是先同步,然后观察commit id 与没同步之前的id是否相同,如果相同则当前master仓库没有任何更新,如果不一样则出现的新的代码更新
如果出现更新则将当前的commit id 写入到文件当中,文件的名字是.commit_id
如果存在这个文件则需要通知dispatcher模块操作,通知代码都是通过socket实现的
代码如下:
if os.path.isfile(".commit_id"):
# great, we have a change! let's execute the tests
# First, check the status of the dispatcher server to see
# if we can send the tests
try:
response = helpers.communicate(dispatcher_host,
int(dispatcher_port),
"status")
except socket.error as e:
raise Exception("Could not communicate with dispatcher server: %s" % e)
if response == "OK":
# Dispatcher is present, let's send it a test
commit = ""
with open(".commit_id", "r") as f:
commit = f.readline()
response = helpers.communicate(dispatcher_host,
int(dispatcher_port),
"dispatch:%s" % commit)
if response != "OK":
raise Exception("Could not dispatch the test: %s" %
response)
print "dispatched!"
else:
# Something wrong happened to the dispatcher
raise Exception("Could not dispatch the test: %s" %
response)
time.sleep(5)
当存在记录commit id文件是就会向dispatcher发送消息,然后等待dispatcher返回消息
2017-7-26 23:07深夜继续撰写
之前已经提到dispatcher这个东西,它在这个集成系统中占据很重要的位置,可以说是枢纽,oberserver 和test runner都会与它有通信
"""
This is the test dispatcher.
It will dispatch tests against any registered test runners when the repo
observer sends it a 'dispatch' message with the commit ID to be used. It
will store results when the test runners have completed running the tests and
send back the results in a 'results' messagee
It can register as many test runners as you like. To register a test runner,
be sure the dispatcher is started, then start the test runner.
"""
import argparse
import os
import re
import socket
import SocketServer
import time
import threading
import helpers
# Shared dispatcher code
def dispatch_tests(server, commit_id):
# NOTE: usually we don't run this forever
while True:
print "trying to dispatch to runners"
for runner in server.runners:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"runtest:%s" % commit_id)
if response == "OK":
print "adding id %s" % commit_id
server.dispatched_commits[commit_id] = runner
if commit_id in server.pending_commits:
server.pending_commits.remove(commit_id)
return
time.sleep(2)
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
runners = [] # Keeps track of test runner pool
dead = False # Indicate to other threads that we are no longer running
dispatched_commits = {} # Keeps track of commits we dispatched
pending_commits = [] # Keeps track of commits we have yet to dispatch
class DispatcherHandler(SocketServer.BaseRequestHandler):
"""
The RequestHandler class for our dispatcher.
This will dispatch test runners against the incoming commit
and handle their requests and test results
"""
command_re = re.compile(r"(\w+)(:.+)*")
BUF_SIZE = 1024
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(self.BUF_SIZE).strip()
command_groups = self.command_re.match(self.data)
if not command_groups:
self.request.sendall("Invalid command")
return
command = command_groups.group(1)
if command == "status":
print "in status"
self.request.sendall("OK")
elif command == "register":
# Add this test runner to our pool
print "register"
address = command_groups.group(2)
host, port = re.findall(r":(\w*)", address)
runner = {"host": host, "port":port}
self.server.runners.append(runner)
self.request.sendall("OK")
elif command == "dispatch":
print "going to dispatch"
commit_id = command_groups.group(2)[1:]
if not self.server.runners:
self.request.sendall("No runners are registered")
else:
# The coordinator can trust us to dispatch the test
self.request.sendall("OK")
dispatch_tests(self.server, commit_id)
elif command == "results":
print "got test results"
results = command_groups.group(2)[1:]
results = results.split(":")
commit_id = results[0]
length_msg = int(results[1])
# 3 is the number of ":" in the sent command
remaining_buffer = self.BUF_SIZE - (len(command) + len(commit_id) + len(results[1]) + 3)
if length_msg > remaining_buffer:
self.data += self.request.recv(length_msg - remaining_buffer).strip()
del self.server.dispatched_commits[commit_id]
if not os.path.exists("test_results"):
os.makedirs("test_results")
with open("test_results/%s" % commit_id, "w") as f:
data = self.data.split(":")[3:]
data = "\n".join(data)
f.write(data)
self.request.sendall("OK")
else:
self.request.sendall("Invalid command")
def serve():
parser = argparse.ArgumentParser()
parser.add_argument("--host",
help="dispatcher's host, by default it uses localhost",
default="localhost",
action="store")
parser.add_argument("--port",
help="dispatcher's port, by default it uses 8888",
default=8888,
action="store")
args = parser.parse_args()
# Create the server
server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
print 'serving on %s:%s' % (args.host, int(args.port))
# Create a thread to check the runner pool
def runner_checker(server):
def manage_commit_lists(runner):
for commit, assigned_runner in server.dispatched_commits.iteritems():
if assigned_runner == runner:
del server.dispatched_commits[commit]
server.pending_commits.append(commit)
break
server.runners.remove(runner)
while not server.dead:
time.sleep(1)
for runner in server.runners:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
response = helpers.communicate(runner["host"],
int(runner["port"]),
"ping")
if response != "pong":
print "removing runner %s" % runner
manage_commit_lists(runner)
except socket.error as e:
manage_commit_lists(runner)
# this will kick off tests that failed
def redistribute(server):
while not server.dead:
for commit in server.pending_commits:
print "running redistribute"
print server.pending_commits
dispatch_tests(server, commit)
time.sleep(5)
runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
redistributor = threading.Thread(target=redistribute, args=(server,))
try:
runner_heartbeat.start()
redistributor.start()
# Activate the server; this will keep running until you
# interrupt the program with Ctrl+C or Cmd+C
server.serve_forever()
except (KeyboardInterrupt, Exception):
# if any exception occurs, kill the thread
server.dead = True
runner_heartbeat.join()
redistributor.join()
if __name__ == "__main__":
serve()
在dispatcher的handle中处理各种来自其他client访问,oberserver会向diapatcher发送commit id ,但是在发送之前还是要询问下当前的dispatcher 服务是否正在运行,所以会有个status状态咨询请求,如果返回ok,则继续下一步向其发送commit id
于此同时dispatcher还要管理runner测试,当前的脚本是可以跑一百个测试线程,每一个线程如果想跑起来就必须要向其注册,只有注册成功的runner才会继续完成测试
test runner 文件
"""
This is the test runner.
It registers itself with the dispatcher when it first starts up, and then waits
for notification from the dispatcher. When the dispatcher sends it a 'runtest'
command with a commit id, it updates its repository clone and checks out the
given commit. It will then run tests against this version and will send back the
results to the dispatcher. It will then wait for further instruction from the
dispatcher.
"""
import argparse
import errno
import os
import re
import socket
import SocketServer
import subprocess
import time
import threading
import unittest
import helpers
class ThreadingTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
dispatcher_server = None # Holds the dispatcher server host/port information
last_communication = None # Keeps track of last communication from dispatcher
busy = False # Status flag
dead = False # Status flag
class TestHandler(SocketServer.BaseRequestHandler):
"""
The RequestHandler class for our server.
"""
command_re = re.compile(r"(\w+)(:.+)*")
def handle(self):
# self.request is the TCP socket connected to the client
self.data = self.request.recv(1024).strip()
command_groups = self.command_re.match(self.data)
command = command_groups.group(1)
if not command:
self.request.sendall("Invalid command")
return
if command == "ping":
print "pinged"
self.server.last_communication = time.time()
self.request.sendall("pong")
elif command == "runtest":
print "got runtest command: am I busy? %s" % self.server.busy
if self.server.busy:
self.request.sendall("BUSY")
else:
self.request.sendall("OK")
print "running"
commit_id = command_groups.group(2)[1:]
self.server.busy = True
self.run_tests(commit_id,
self.server.repo_folder)
self.server.busy = False
else:
self.request.sendall("Invalid command")
def run_tests(self, commit_id, repo_folder):
# update repo
output = subprocess.check_output(["./test_runner_script.sh",
repo_folder, commit_id])
print output
# run the tests
test_folder = os.path.join(repo_folder, "tests")
suite = unittest.TestLoader().discover(test_folder)
result_file = open("results", "w")
unittest.TextTestRunner(result_file).run(suite)
result_file.close()
result_file = open("results", "r")
# give the dispatcher the results
output = result_file.read()
helpers.communicate(self.server.dispatcher_server["host"],
int(self.server.dispatcher_server["port"]),
"results:%s:%s:%s" % (commit_id, len(output), output))
def serve():
range_start = 8900
parser = argparse.ArgumentParser()
parser.add_argument("--host",
help="runner's host, by default it uses localhost",
default="localhost",
action="store")
parser.add_argument("--port",
help="runner's port, by default it uses values >=%s" % range_start,
action="store")
parser.add_argument("--dispatcher-server",
help="dispatcher host:port, by default it uses " \
"localhost:8888",
default="localhost:8888",
action="store")
parser.add_argument("repo", metavar="REPO", type=str,
help="path to the repository this will observe")
args = parser.parse_args()
runner_host = args.host
runner_port = None
tries = 0
if not args.port:
runner_port = range_start
while tries < 100: try: server = ThreadingTCPServer((runner_host, runner_port), TestHandler) print server print runner_port break except socket.error as e: if e.errno == errno.EADDRINUSE: tries += 1 runner_port = runner_port + tries continue else: raise e else: raise Exception("Could not bind to ports in range %s-%s" % (range_start, range_start+tries)) else: runner_port = int(args.port) server = ThreadingTCPServer((runner_host, runner_port), TestHandler) server.repo_folder = args.repo dispatcher_host, dispatcher_port = args.dispatcher_server.split(":") server.dispatcher_server = {"host":dispatcher_host, "port":dispatcher_port} response = helpers.communicate(server.dispatcher_server["host"], int(server.dispatcher_server["port"]), "register:%s:%s" % (runner_host, runner_port)) if response != "OK": raise Exception("Can't register with dispatcher!") def dispatcher_checker(server): # Checks if the dispatcher went down. If it is down, we will shut down # if since the dispatcher may not have the same host/port # when it comes back up. while not server.dead: time.sleep(5) if (time.time() - server.last_communication) > 10:
try:
response = helpers.communicate(
server.dispatcher_server["host"],
int(server.dispatcher_server["port"]),
"status")
if response != "OK":
print "Dispatcher is no longer functional"
server.shutdown()
return
except socket.error as e:
print "Can't communicate with dispatcher: %s" % e
server.shutdown()
return
t = threading.Thread(target=dispatcher_checker, args=(server,))
try:
t.start()
# Activate the server; this will keep running until you
# interrupt the program with Ctrl-C
server.serve_forever()
except (KeyboardInterrupt, Exception):
# if any exception occurs, kill the thread
server.dead = True
t.join()
if __name__ == "__main__":
serve()
test_runner_script.sh
#!/bin/bash
REPO=1
COMMIT=2
source run_or_fail.sh
run_or_fail "Repository folder not found" pushd "REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "COMMIT"