0%

Python实现git自动提交与远程仓库推送

  1. 前置准备
    1. 库依赖
    2. 处理命令行输入中文
    3. 处理cmd中的颜色
    4. 终端输出颜色函数
  2. 使用pygit2的功能函数
    1. 获取git配置
    2. 添加暂存
    3. 提交
    4. 获取当前分支信息
  3. 完整流程实现代码

基于libgit2的python实现——pygit2,读取文件夹分支更改情况并自动提交,再读取远程仓库列表执行push操作

前置准备

库依赖

1
2
3
import os
import time
import pygit2 as git

处理命令行输入中文

1
2
import locale
locale.setlocale(locale.LC_CTYPE, 'chinese')

处理cmd中的颜色

为了使命令行输出颜色在Windows终端中显示正常

1
2
from colorama import init
init(autoreset=True)

终端输出颜色函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def red(str: str):
return '\x1b[31m{:s}\x1b[0m'.format(str)


def green(str: str):
return '\x1b[32m{:s}\x1b[0m'.format(str)


def yellow(str: str):
return '\x1b[33m{:s}\x1b[0m'.format(str)


def blue(str: str):
return '\x1b[96m{:s}\x1b[0m'.format(str)


def gray(str: str):
return '\x1b[90m{:s}\x1b[0m'.format(str)

使用pygit2的功能函数

获取git配置

从当前仓库读取配置信息,如果不存在则读取全局的,还不存在就报错

1
2
3
4
5
6
7
8
def get_config(repo: git.Repository, key: str):
try:
val: str = repo.config.__getitem__(key)
return val
except:
print('获取配置[{:s}]失败,请通过以下命令配置:'.format(key))
print('git config --global {:s}=[...]'.format(key))
return None

使用:

1
2
username = get_config(repo, 'user.name')
# git config user.name

添加暂存

等价于:git add .

1
2
3
4
5
6
7
8
9
10
11
12
13
def git_add_all(repo: git.Repository):
"""
git add .
"""
try:
idx = repo.index
idx.add_all()
idx.write()
except Exception as e:
print(e)
return False

return True

提交

等价于:git commit -m "msg"

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def git_commit(repo: git.Repository, msg: str, username: str, useremail: str):
"""
git commit -m "msg"
"""
try:
author = git.Signature(username, useremail)
committer = author
tree = repo.index.write_tree()
parents = [] if repo.head_is_unborn else [repo.head.target]
ref = 'HEAD' if repo.head_is_unborn else repo.head.name
hash: str = repo.create_commit(
ref, author, committer, msg, tree, parents)
return hash
except Exception as e:
print('git_commit(): ', e)
return None

获取当前分支信息

1
2
3
4
5
6
7
def get_current_branch(repo: git.Repository):
for name in repo.branches.local:
branch = repo.lookup_branch(name)
if branch.is_head():
bName: str = branch.branch_name
return bName
return None

完整流程实现代码

逻辑为:

  1. 读取仓库和分支信息,若出现异常则报错并退出程序;
  2. 检查是否存在更改
    • 若存在,则要求用户完成一次提交;
    • 若不存在,直接进入下一步;
  3. 检查是否存在远程仓库
    • 若存在,则将当前分支逐一推送;
    • 若不存在,提示并退出程序;

由于推送远程仓库的操作太麻烦还需要读取ssh配置,懒得搞,就直接执行git push了事。

其实用pygit2主要是用来读取一些仓库信息的,比如分支列表、远程列表这种,要完成git addgit commit这类操作,还是用git命令来得方便,毕竟libgit2实在是太底层了,什么信息都要自己填,实在没必要。

以下是代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145

if __name__ == '__main__':
# 读取仓库信息
repo_path = os.getcwd()
try:
repo = git.Repository(repo_path)
except:
print(red('[{:s}]不是一个git仓库').format(repo_path))
exit()

# 检查是否发生更改,决定是否进行提交操作
status = [item for item in repo.status().items() if item[1] != 16384]
if len(list(status)) == 0:
print(yellow('仓库[{:s}]未发生更改').format(repo_path))
else:
# 获取文件更改信息
create_items = []
modify_items = []
delete_items = []
for item in status:
if item[1] == 128: # 新增
create_items.append(item[0])
elif item[1] == 256: # 修改
modify_items.append(item[0])
elif item[1] == 512: # 删除
delete_items.append(item[0])

# 获取提交者信息
username = get_config(repo, 'user.name')
useremail = get_config(repo, 'user.email')
if username is None or useremail is None:
exit()

# 打印文件更改信息与提交者信息
for item in create_items:
print(green('create: [{:s}]').format(item))
for item in modify_items:
print(yellow('modify: [{:s}]').format(item))
for item in delete_items:
print(red('delete: [{:s}]').format(item))

print('\nAuthor: {:s}[{:s}]'.format(gray(username), gray(useremail)))

# 构建提交信息
while True:
print(yellow('\n请输入commit信息(为空则自动填充时间)'))
msg = input()
msg = time.strftime('%Y年%m月%d日 %H:%M:%S', time.localtime(
time.time())) if msg == '' else msg

if len(msg) > 50:
print(gray('消息过长,请控制在50以内:'))
print(msg[0:50])
continue
break

# 提交 https://www.pygit2.org/recipes/git-commit.html
if not git_add_all(repo):
exit()
hash = git_commit(repo, msg, username, useremail)
if hash is None:
print(red('提交失败!'))
exit()

# 打印提交的信息
print(green('\n**********执行 commit 成功! **********'))
commit = repo[hash]

hash = blue(str(commit.id)[0:10] + '...')
time = yellow(time.strftime('%Y年%m月%d日 %H:%M:%S',
time.localtime(commit.commit_time)))
name = commit.author.name
email = commit.author.email

diff = repo.diff(commit.id, commit.parent_ids[0] if len(
commit.parent_ids) > 0 else '4b825dc642cb6eb9a060e54bf8d69288fbee4904')
new_lines = 0
rmv_lines = 0
files_changed = [[], [], []]
for patch in diff:
# ([更改的行的前后留白位置], [删除的行数], [新增的行数])
line_stat = patch.line_stats
new_lines += line_stat[2]
rmv_lines += line_stat[1]

line_modi = '{:s} {:s}'.format(green('+{:d}'.format(line_stat[2])), red(
'-{:d}'.format(line_stat[1]))) if patch.delta.is_binary == False else ''

if patch.delta.status == 2: # create
files_changed[0].append('* {type} {file} {line_modi}'.format(
file=gray(patch.delta.new_file.path),
type=green('create'),
line_modi=line_modi)
)
elif patch.delta.status == 1: # delete
files_changed[2].append('* {type} {file} {line_modi}'.format(
file=gray(patch.delta.new_file.path),
type=red('delete'),
line_modi=line_modi)
)
else: # modify and other
files_changed[1].append('* {type} {file} {line_modi}'.format(
file=gray(patch.delta.new_file.path),
type=yellow('modify'),
line_modi=line_modi)
)

files_changed = files_changed[0] + files_changed[1] + files_changed[2]

msg = commit.message.replace('\n', '')
print(
'{hash} {time} {author} {new_lines} {rmv_lines} {msg}'.format(
hash=hash, time=time, msg=msg,
author=gray('{name}[{email}]'.format(name=name, email=email)),
new_lines=green('+{:d}'.format(new_lines)),
rmv_lines=red('-{:d}'.format(rmv_lines)),
)
)
for item in files_changed:
print(item)

print(green('\n**********开始push到远程仓库**********'))

# 检查远程仓库列表
if len(repo.remotes) == 0:
print(gray("远程仓库列表为空,无需push"))
exit()
else:
# 当前分支
cur_branch = get_current_branch(repo)
if cur_branch is None:
print(red('分支读取出错,请通过git检查'))
exit()
print('Current Branch: [{:s}]'.format(blue(cur_branch)))

for remote in repo.remotes:
try:
print('\n正在推送远程仓库[{:s}]: {:s}'.format(
blue(remote.name), green(remote.url)))
# remote.push([repo.head.name]) # 不支持ssh,懒得搞了,草
os.system('git push {:s} {:s}'.format(remote.name, cur_branch))
print(yellow('ok'))
except Exception as e:
print(gray('失败: {:s}').format(red(str(e))))
exit()