一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術及教程分享平臺!
分類導航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務器之家 - 腳本之家 - Python - Python實現基于多線程、多用戶的FTP服務器與客戶端功能完整實例

Python實現基于多線程、多用戶的FTP服務器與客戶端功能完整實例

2020-12-02 00:56水·域 Python

這篇文章主要介紹了Python實現基于多線程、多用戶的FTP服務器與客戶端功能,結合完整實例形式分析了Python多線程、多用戶FTP服務器端與客戶端相關實現技巧與注意事項,需要的朋友可以參考下

本文實例講述了Python實現基于多線程、多用戶的FTP服務器客戶端功能。分享給大家供大家參考,具體如下:

項目介紹:

1. 用戶加密認證
2. 允許同時多用戶登錄
3. 每個用戶有自己的家目錄 ,且只能訪問自己的家目錄
4. 對用戶進行磁盤配額,每個用戶的可用空間不同
5. 允許用戶在ftp server上隨意切換目錄
6. 允許用戶查看當前目錄下文件
7. 允許上傳和下載文件,保證文件一致性
8. 文件傳輸過程中顯示進度條

實現的原理:

服務器端啟用端口監聽,并對每一連接啟用一個線程,對用戶登陸密碼采用SHA512進行加密并進行匹配,當用戶登陸成功后,實例化FTPS,并引導客戶端進入主命令模式,

然后實現FTP的上傳功能、下載功能、新建目錄、刪除文件或目錄、切換目錄等實例化操作,同時對相關上傳下載進行進度條顯示,服務器端顯示下載或上傳文件的大小等

客戶端與服務器協商建立連接后,進行用戶身份登陸,登陸成功接收服務器指令,轉入命令輸入窗口,同時對put 與 get命令進行判斷,實現特定的上傳與下載功能

核心代碼實現如下:

服務器端

main.py

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python3.5
# -*-coding:utf8-*-
import os,sys,socket,pickle
BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASEDIR)
from conf import setting
from core import file_handler
from core import db_handler
import select,hashlib
import threading
def login(username,password):
  """
  FTP登陸驗證函數
  :param username:
  :param password:
  :return:
  # testDict ={"username":"jjb","password":"123456","file_dir":"E:\python","file_size":500}
  # file = 'jjb.pkl'
  # fp = open(file,'wb')
  # pickle.dump(testDict,fp)
  # fp.close()
  f = open("jjb.pkl","rb")
  data = pickle.loads(f.read())
  f.close()
  print(data)
  """
  #實例化加密函數
  hash = hashlib.sha512()
  db= db_handler.handler(setting.DATABASE,username)
  if os.path.isfile(db):
    f = open(db,"rb")
    data = pickle.loads(f.read())
    f.close()
    if username == data["name"]:
      hash.update(bytes(data["password"],"utf8"))
      hash_pwd = hash.hexdigest()
      if hash_pwd == password:
        filedir = data["file_dir"]
        filesize = data["file_size"]
        return "True|%s|%s"%(filedir,filesize)
      else:
        return "False||"
    else:
      return "False||"
  else:
    return "False||"
def process(conn,addr):
  flage = "False"
  # 接收客戶端連接請求信息
  info = conn.recv(1000)
  if info.decode() == "connect":
    conn.send(bytes("login","utf8"))
  # 接收用戶及密碼信息
  while flage =="False":
    user_check =conn.recv(8000)
    # 分割用戶名及密碼
    username,password = str(user_check.decode()).split("|")
    # 調用登陸驗證函數
    login_ack = login(username,password)
    flage,home,size = str(login_ack).split("|")
    # print(flage,home,size)
    # print("user_input:",username,"user_pass:",password)
    if flage =="True":
      # 登陸成功發送登陸確認信息給客戶端
      conn.send(bytes("login_ack","utf8"))
      # 實例化FTPserver
      ftp = file_handler.FTPs(username,conn,home,size) # 登陸用戶,數據連接,工作目錄,磁盤配額
      ftp.run()
      break
    else:
      # 登陸失敗,發送給客戶端重新驗證
      conn.send(bytes("登陸失??!","utf8"))
def ftp_server():
  '''
  啟動FTP服務器端,開啟線程監聽
  :return:
  '''
  server = socket.socket()
  server.bind((setting.IP_PORT["host"],setting.IP_PORT["port"]))
  server.listen(10)
  while True:
    r,w,e = select.select([server,], [], [], 1)
    for i,server in enumerate(r):
      conn,addr = server.accept()
      # 創建線程
      t = threading.Thread(target=process, args=(conn, addr))
      # 啟動線程
      t.start()
  server.close()
def run():
  ftp_server()
if __name__ =="__main__":
  run()

file_handler.py:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#!/usr/bin/env python3.5
# -*-coding:utf8-*-
import os,sys
BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASEDIR)
import re
from core import db_handler
from conf import setting
import pickle
class FTPs(object):
  '''
  ftp操作命令方法:
  '''
  def __init__(self,username,conn,home,total_size):
    '''
    初始化參數
    :param username: 操作用戶名
    :param conn: sock連接
    :param home: 用戶根目錄
    :param total_size: 磁盤配額
    :return:
    '''
    self.username = username
    self.conn = conn
    self.root = home
    self.home = self.root
    self.total_size = int(total_size)
    self.cmd_file = None # 文件指令
    self.psize = 4096 # 文件分片
  def getdirsize(self,space):
    '''
    計算磁盤空間大小
    :return:
    '''
    self.dirsize = 0
    for root,dirs,files in os.walk(space):
      self.dirsize += (sum([os.path.getsize(os.path.join(root,name))for name in files])/1024)
    return int(self.dirsize)
  def put(self):
    '''
    上傳文件
    :return:
    '''
    if self.cmd_file:
      self.user_space = int(self.getdirsize(self.root)/1024)
      # 組合接收字符串
      self.file_root = '%s\\%s'% (self.home,self.cmd_file)
      # # 獲取文件名
      self.f =os.path.basename(self.file_root)
      if os.path.isdir(self.home):
        os.chdir(self.home)
      else:
        os.makedirs(self.home)
        os.chdir(self.home)
      try:
        self.conn.send(bytes("f_ack","utf8"))
        self.size = str(self.conn.recv(1024).decode()).split("|")
        if self.size[0]== "fsize":
          self.fss = int(self.size[1])
          self.f_total_size = int(self.user_space + (self.fss/1024/1024))
          if self.f_total_size < self.total_size: # 判斷空間是否超額
            self.conn.send(bytes("f_ack_ready","utf8"))
            self.bsize = 0
            print("需要上傳文件大?。?quot;,self.fss)
            # 打開文件
            f=open(self.f,'wb')
            while self.bsize < self.fss:
              data = self.conn.recv(self.psize)
              self.bsize += len(data)
              f.write(data)
            self.conn.send(bytes("ok","utf8"))
            print("實際已上傳文件大?。?quot;,self.bsize)
          else:
            self.conn.send(bytes("上傳空間不足!無法上傳,你當前磁盤配額為%sM"%self.total_size,"utf8"))
      except Exception as ex:
        self.conn.send(bytes(ex,"utf8"))
    else:
      self.conn.send(bytes("請上傳文件,文件不能為空","utf8"))
  def get(self):
    '''
    下載文件
    :return:
    '''
    if self.cmd_file:
      os.chdir(self.home) # 進入用戶根目錄
      self.file = os.getcwd()+"\\"+ self.cmd_file
      if os.path.isfile(self.file):
        f = open(self.file, 'rb')
        self.fsize = os.path.getsize(self.file) # 獲取要發送文件的大小
        self.conn.send(bytes("f_ack_read","utf8"))
        self.conn.recv(1000)
        print("需發送文件大?。?quot;,self.fsize)
        self.conn.send(bytes("fsize|%s"%self.fsize,"utf8")) # 發送文件大小及要發送準備完畢指令
        if self.conn.recv(1000).decode() == "f_ack": # 接收對方是否準備就緒
          self.fsize = int(self.fsize)
          self.size = 0
          ack =""
          while self.size < self.fsize and ack !="ok":
            data = f.read(self.fsize) # 一次讀取分片大小4096
            self.conn.send(data)
            self.size += len(data)
          print("實際發送文件大?。?quot;,self.size)
          ack = self.conn.recv(1000).decode() # 接收客戶端是否下載完指令
          self.conn.send(bytes("成功","utf8"))
        else:
          self.conn.send(bytes("接收失敗","utf8"))
      else:
        self.conn.send(bytes("文件不存在","utf8"))
    else:
      self.conn.send(bytes("請輸入文件名","utf8"))
  def dir(self):
    '''
    查看文件
    :return:
    '''
    self.current_space =int(self.getdirsize(self.home))
    # 文件列表
    self.li = ""
    # 目錄列表
    self.dl = ""
    try:
      os.chdir(self.home)
    except:
      os.makedirs(self.home)
      os.chdir(self.home)
    try:
      if os.listdir(os.getcwd()):
        for self.i in os.listdir(os.getcwd()):
          self.file = os.getcwd()+'\\'+self.i
          if os.path.isfile(self.file):
            # 獲取文件大小
            self.fsize = int(os.path.getsize(self.file)/1024)
            if self.fsize < 1:
              self.fsize = 4
            else:
              self.fsize +=4
            self.li += '%s -rw-rw-rw- 占用大小:%skb\r\n'% (self.i,self.fsize)
          else:
            self.dl += '%s\r\n'%self.i
        self.conn.send(bytes("目錄:\r\n\r\n%s 文件:\r\n%s\r\n \r\n當前目錄空間大?。?skb"%(self.dl,self.li,self.current_space),"utf8"))
      else:
        self.conn.send(bytes("當前目錄為:%s"%(self.home),"utf8"))
    except Exception as ex:
      self.conn.send(bytes(ex,"utf8"))
  def cd(self):
    '''
    進入目錄
    :return:
    '''
    if self.cmd_file:
      os.chdir(self.home) # 先進入到工作目錄
      self.dir_change = os.path.abspath(os.path.join(self.home,"%s\%s"%(self.home,self.cmd_file)))
      if self.root in self.dir_change:
        try:
          os.chdir(self.dir_change)
          self.home = self.dir_change
          self.conn.send(bytes("當前工作目錄為:%s"%self.home,"utf8"))
        except:
          os.makedirs(self.dir_change)
          os.chdir(self.dir_change)
          self.home = self.dir_change
          self.conn.send(bytes("當前工作目錄為:%s"%self.home,"utf8"))
      else:
          self.conn.send(bytes("當前工作目錄為:%s  更改失??!"%self.home,"utf8"))
    else:
      os.chdir(self.home)
      self.conn.send(bytes("當前工作目錄為:%s"%self.home,"utf8"))
  def mkd(self):
    '''
    創建目錄
    :return:
    '''
    if self.cmd_file:
      try:
        os.makedirs(self.cmd_file)
        self.conn.send(bytes("創建目錄成功!","utf8"))
      except Exception as ex:
        self.conn.send(bytes("創建目錄失??!原因:%s"%ex,"utf8"))
    else:
      self.conn.send(bytes("請輸入文件夾名!","utf8"))
  def delete(self):
    '''
    刪除文件
    :return:
    '''
    os.chdir(self.home) # 進入用戶根目錄
    try:
      self.file = self.home+'\\'+ self.cmd_file
      if os.path.isfile(self.file):
        os.remove(self.cmd_file)
        self.conn.send(bytes("文件:%s刪除成功!"%self.cmd_file,"utf8"))
      else:
        os.removedirs(self.cmd_file)
        self.conn.send(bytes("目錄刪除成功!","utf8"))
        os.chdir(self.root)
    except Exception:
      if os.path.isdir(self.root):
        self.conn.send(bytes("刪除失??!","utf8"))
      else:
        os.makedirs(self.root)
        self.conn.send(bytes("刪除失??!","utf8"))
  def help(self):
    '''
    FTP幫助信息
    :return:
    '''
    self.conn.send(bytes("""
    FTP服務器操作方法有: put------>上傳文件至服務器
               get------>從服務器上下載文件
               dir------>查看服務器文件列表
               cd------->進入指定文件夾
               delete--->刪除文件
               mkd ----->創建目錄
               help----->幫助信息
               q ------->退出
    ""","utf8"))
  def run(self):
    while True:
      # try:
       # # 接收客戶端發來的命令信息
      self.cmd = self.conn.recv(1000)
      self.cmd_action = str(self.cmd.decode())
      # 判斷命令是否含有空格
      self.fg = re.search("\s","%s"%self.cmd_action)
      if self.fg:
        self.cmd_action,self.cmd_file = str(self.cmd_action).split(" ")
      else:
        self.cmd_file =None
      # print("cmd_action:",self.cmd_action,"cmd_file:",self.cmd_file)
      if hasattr(FTPs,self.cmd_action):
        func = getattr(self,self.cmd_action)
        func()
        continue
      else:
        self.conn.send(b'command is not found!')
        continue
      # except Exception as ex:
      #   print("系統異常:%s"%ex)

客戶端

client.py:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/env python3.5
# -*-coding:utf8-*-
import sys,os,re
import socket,hashlib
BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASEDIR)
from core import file_handler
from conf import setting
def login():
  hash = hashlib.sha512()
  while True:
    user_input = input("請輸入用戶名:").strip()
    pass_input = input("請輸入密碼:").strip()
    if len(user_input) !=0 and len(pass_input) != 0:
      hash.update(bytes(pass_input,"utf8"))
      sha_pwd = hash.hexdigest()
      user = "%s|%s"% (user_input,sha_pwd)
      return user
      break
def ftp_client():
  sk = socket.socket()
  sk.connect((setting.IP_PORT["host"],setting.IP_PORT["port"]))
  while True:
    flage = False
    sk.send(bytes("connect","utf8"))
    msg = sk.recv(100)
    print("歡迎訪問FTP服務器,請根據提示進行操作")
    if msg.decode() == "login":
      while flage == False:
        login_user =login()
        username,password = str(login_user).split("|")
        sk.send(bytes(login_user,"utf8"))
        user_info = sk.recv(1000)
        if user_info.decode() == "login_ack":
          print("登陸成功!")
          flage = True
          break
        print(user_info.decode())
      while flage:
        cmd_action = input("請輸入操作命令如:get fy.py or help :").strip()
        if len(cmd_action) == 0:continue
        if cmd_action == "q":
          sys.exit()
        # 判斷命令是否含有空格
        fg = re.search("\s","%s"%cmd_action)
        if fg:
          cmd,cmd_file = str(cmd_action).split(" ")
          ftp = file_handler.ftpc(sk,username,cmd_action,setting.DATABASE["local"])
          if hasattr(ftp,cmd):
            func = getattr(ftp,cmd)
            func()
            continue
        else:
          cmd_file =None
        sk.send(bytes(cmd_action,"utf8"))
        rec_msg = sk.recv(8000)
        print(rec_msg.decode())
      if flage == "False":
        sk.send(bytes("connect","utf8"))
  sk.close()
def run():
  ftp_client()
if __name__ == "__main__":
  run()

file_handler.py:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python3.5
# -*-coding:utf8-*-
import sys,os,re
import socket
BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASEDIR)
class ftpc(object):
  def __init__(self,sk,username,cmd_action,home):
    self.sk = sk
    self.username = username
    self.cmd_action = cmd_action
    self.home = home
  def put(self):
    '''
    上傳文件
    :return:
    '''
    try:
      os.chdir(self.home)
    except:
      os.makedirs(self.home)
      os.chdir(self.home)
    # 判斷命令是否含有空格
    fg = re.search("\s","%s"%self.cmd_action)
    if fg:
      self.cmd,self.cmd_file = str(self.cmd_action).split(" ")
      if os.path.isfile(os.getcwd()+"\\"+self.cmd_file):
        self.sk.send(bytes(self.cmd_action,"utf8")) # 發送動作命令
        rec_msg = self.sk.recv(8000)
        if rec_msg.decode() == "f_ack":
          f = open(self.cmd_file, 'rb')
          self.fsize = os.path.getsize(self.cmd_file) # 獲取要發送文件的大小
          self.sk.send(bytes("fsize|%s"%self.fsize,"utf8")) # 發送文件大小
          self.ack = self.sk.recv(1000)
          if self.ack.decode() =="f_ack_ready":
            self.fsize = int(self.fsize)
            self.size = 0
            ack =""
            while self.size < self.fsize and ack !="ok":
              data = f.read(4095) # 一次讀取分片大小4095
              self.sk.send(data)
              self.size += len(data)
              count = int(self.size/self.fsize*100)
              print('#'*count,"->",(count),"%")
            ack = self.sk.recv(1000).decode()
            if ack =="ok":
              print("上傳成功")
            else:
              print("上傳失敗")
          else:
            print(self.ack.decode())
        else:
          print("上傳文件失敗:%s"%rec_msg.decode())
      else:
        print("上傳文件失敗,請輸入正確的文件名!")
    else:
      print("上傳文件失敗,請輸入正確的文件名!")
  def get(self):
    '''
    下載文件
    :return:
    '''
    try:
      os.chdir(self.home)
    except:
      os.makedirs(self.home)
      os.chdir(self.home)
    # 判斷命令是否含有空格
    fg = re.search("\s","%s"%self.cmd_action)
    if fg:
      self.cmd,self.cmd_file = str(self.cmd_action).split(" ")
    else:
      self.cmd_file =None
    self.sk.send(bytes(self.cmd_action,"utf8"))
    rec_msg = self.sk.recv(8000)
    if rec_msg.decode() == "f_ack_read":
      self.rec = self.sk.send(bytes("ok","utf8"))
      self.rec_size = self.sk.recv(2048)
      self.ack_rec= str(self.rec_size.decode()).split("|")
      self.sk.send(bytes("f_ack","utf8"))
      self.ack_s =int(self.ack_rec[1])
      print(self.ack_s)
      self.re_s = 0
      f = open(self.cmd_file,"wb")
      while self.re_s < self.ack_s:
        xx = self.re_s/self.ack_s*100
        data = self.sk.recv(4096)
        self.re_s += len(data)
        # print(data.decode("gbk"))
        f.write(data)
        count = int(xx)
        print('#'*count,"->",(count+1),"%")
      self.sk.send(bytes("ok","utf8"))
      print(self.re_s)
      self.ack_ok = self.sk.recv(1024)
      print("接收文件:%s"%self.ack_ok.decode())
    else:
      print("接收文件失?。?s"%rec_msg.decode())

如下是重要模塊進行收藏:

OS模塊

os.getcwd() 獲取當前工作目錄,即當前python腳本工作的目錄路徑
os.chdir("dirname")  改變當前腳本工作目錄;相當于shell下cd
os.curdir  返回當前目錄: ('.')
os.pardir  獲取當前目錄的父目錄字符串名:('..')
os.makedirs('dirname1/dirname2')    可生成多層遞歸目錄
os.removedirs('dirname1')    若目錄為空,則刪除,并遞歸到上一級目錄,如若也為空,則刪除,依此類推
os.mkdir('dirname')    生成單級目錄;相當于shell中mkdir dirname
os.rmdir('dirname')   刪除單級空目錄,若目錄不為空則無法刪除,報錯;相當于shell中rmdir dirname
os.listdir('dirname')    列出指定目錄下的所有文件和子目錄,包括隱藏文件,并以列表方式打印
os.remove()  刪除一個文件
os.rename("oldname","newname")  重命名文件/目錄
os.stat('path/filename')  獲取文件/目錄信息
os.sep    輸出操作系統特定的路徑分隔符,win下為"\\",Linux下為"/"
os.linesep    輸出當前平臺使用的行終止符,win下為"\t\n",Linux下為"\n"
os.pathsep    輸出用于分割文件路徑的字符串
os.name    輸出字符串指示當前使用平臺。win->'nt'; Linux->'posix'
os.system("bash command")  運行shell命令,直接顯示
os.environ  獲取系統環境變量
os.path.abspath(path)  返回path規范化的絕對路徑
os.path.split(path)  將path分割成目錄和文件名二元組返回
os.path.dirname(path)  返回path的目錄。其實就是os.path.split(path)的第一個元素
os.path.basename(path)  返回path最后的文件名。如何path以/或\結尾,那么就會返回空值。即os.path.split(path)的第二個元素
os.path.exists(path)  如果path存在,返回True;如果path不存在,返回False
os.path.isabs(path)  如果path是絕對路徑,返回True
os.path.isfile(path)  如果path是一個存在的文件,返回True。否則返回False
os.path.isdir(path)  如果path是一個存在的目錄,則返回True。否則返回False
os.path.join(path1[, path2[, ...]])  將多個路徑組合后返回,第一個絕對路徑之前的參數將被忽略
os.path.getatime(path)  返回path所指向的文件或者目錄的最后存取時間
os.path.getmtime(path)  返回path所指向的文件或者目錄的最后修改時間

sys模塊

sys.argv           命令行參數List,第一個元素是程序本身路徑
sys.exit(n)        退出程序,正常退出時exit(0)
sys.version        獲取Python解釋程序的版本信息
sys.maxint         最大的Int值
sys.path           返回模塊的搜索路徑,初始化時使用PYTHONPATH環境變量的值
sys.platform       返回操作系統平臺名稱

?
1
2
sys.stdout.write('please:')
val = sys.stdin.readline()[:-1]

re 模塊

匹配格式

 

模式 描述
^ 匹配字符串的開頭
$ 匹配字符串的末尾。
. 匹配任意字符,除了換行符,當re.DOTALL標記被指定時,則可以匹配包括換行符的任意字符。
[...] 用來表示一組字符,單獨列出:[amk] 匹配 'a','m'或'k
[^...] 不在[]中的字符:[^abc] 匹配除了a,b,c之外的字符。
re* 匹配0個或多個的表達式。
re+ 匹配1個或多個的表達式。
re? 匹配0個或1個由前面的正則表達式定義的片段,非貪婪方式
re{ n}
re{ n,} 精確匹配n個前面表達式。
re{ n, m} 匹配 n 到 m 次由前面的正則表達式定義的片段,貪婪方式
a| b 匹配a或b
(re) G匹配括號內的表達式,也表示一個組
(?imx) 正則表達式包含三種可選標志:i, m, 或 x 。只影響括號中的區域。
(?-imx) 正則表達式關閉 i, m, 或 x 可選標志。只影響括號中的區域。
(?: re) 類似 (...), 但是不表示一個組
(?imx: re) 在括號中使用i, m, 或 x 可選標志
(?-imx: re) 在括號中不使用i, m, 或 x 可選標志
(?#...) 注釋.
(?= re) 前向肯定界定符。如果所含正則表達式,以 ... 表示,在當前位置成功匹配時成功,否則失敗。但一旦所含表達式已經嘗試,匹配引擎根本沒有提高;模式的剩余部分還要嘗試界定符的右邊。
(?! re) 前向否定界定符。與肯定界定符相反;當所含表達式不能在字符串當前位置匹配時成功
(?> re) 匹配的獨立模式,省去回溯。
\w 匹配字母數字
\W 匹配非字母數字
\s 匹配任意空白字符,等價于 [\t\n\r\f].
\S 匹配任意非空字符
\d 匹配任意數字,等價于 [0-9].
\D 匹配任意非數字
\A 匹配字符串開始
\Z 匹配字符串結束,如果是存在換行,只匹配到換行前的結束字符串。c
\z 匹配字符串結束
\G 匹配最后匹配完成的位置。
\b 匹配一個單詞邊界,也就是指單詞和空格間的位置。例如, 'er\b' 可以匹配"never" 中的 'er',但不能匹配 "verb" 中的 'er'。
\B 匹配非單詞邊界。'er\B' 能匹配 "verb" 中的 'er',但不能匹配 "never" 中的 'er'。
\n, \t, 等. 匹配一個換行符。匹配一個制表符。等
\1...\9 匹配第n個分組的子表達式。
\10 匹配第n個分組的子表達式,如果它經匹配。否則指的是八進制字符碼的表達式。

 

正則表達式常用5種操作

re.match(pattern, string) # 從頭匹配
re.search(pattern, string) # 匹配整個字符串,直到找到一個匹配
re.split() # 將匹配到的格式當做分割點對字符串分割成列表

?
1
2
>>>m = re.split("[0-9]", "alex1rain2jack3helen rachel8")
>>>print(m)

輸出:['alex', 'rain', 'jack', 'helen rachel', '']

re.findall() # 找到所有要匹配的字符并返回列表格式

?
1
2
>>>m = re.findall("[0-9]", "alex1rain2jack3helen rachel8")
>>>print(m)

輸出:['1', '2', '3', '8']

re.sub(pattern, repl, string, count,flag) # 替換匹配到的字符

?
1
2
m=re.sub("[0-9]","|", "alex1rain2jack3helen rachel8",count=2 )
print(m)

輸出:alex|rain|jack3helen rachel8

正則表達式實例

字符匹配

 

實例 描述
python 匹配 "python".

 

字符類

 

實例 描述
[Pp]ython 匹配 "Python" 或 "python
rub[ye] 匹配 "ruby" 或 "rube
[aeiou] 匹配中括號內的任意一個字母
[0-9] 匹配任何數字。類似于 [0123456789]
[a-z] 匹配任何小寫字母
[A-Z] 匹配任何大寫字母
[a-zA-Z0-9] 匹配任何字母及數字
[^aeiou] 除了aeiou字母以外的所有字符
[^0-9] 匹配除了數字外的字符

 

特殊字符類

 

實例 描述
. 匹配除 "\n" 之外的任何單個字符。要匹配包括 '\n' 在內的任何字符,請使用象 '[.\n]' 的模式。
\d 匹配一個數字字符。等價于 [0-9]。
\D 匹配一個非數字字符。等價于 [^0-9]。
\s 匹配任何空白字符,包括空格、制表符、換頁符等等。等價于 [ \f\n\r\t\v]。
\S 匹配任何非空白字符。等價于 [^ \f\n\r\t\v]。
\w 匹配包括下劃線的任何單詞字符。等價于'[A-Za-z0-9_]'。
\W 匹配任何非單詞字符。等價于 '[^A-Za-z0-9_]'。

 

re.match與re.search的區別

re.match只匹配字符串的開始,如果字符串開始不符合正則表達式,則匹配失敗,函數返回None;而re.search匹配整個字符串,直到找到一個匹配。

希望本文所述對大家Python程序設計有所幫助。

原文鏈接:http://www.cnblogs.com/IPYQ/p/5540076.html

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 80日本xxxxxxxxx| 日本高清无吗 | 成人二区 | 青草视频久久 | 男人操美女逼视频 | 亚洲日韩欧美一区二区在线 | 日本加勒比一区 | 国产一区二区三区丶四区 | 国产精品对白刺激久久久 | ts人妖另类国产 | 大肚孕妇的高h辣文 | 欧美xxoo做爰猛烈视频 | 日韩aⅴ在线观看 | 色中色官网 | 精品国产91久久久久久久a | 花唇肿胀无法合拢双性 | 国内精品露脸在线视频播放 | 国产拍拍视频一二三四区 | 国产盗摄美女嘘嘘视频 | 嫩草影院地址一地址二 | 公园吃女人奶野战视频 | 青草视频网址 | 2020国产精品永久在线观看 | 免费看黄色片的网站 | juy799大岛优香在线观看 | 清纯漂亮女友初尝性过程 | 国产亚洲欧美日韩综合综合二区 | 午夜国产精品影院在线观看 | 国产一区二区三区毛片 | 四虎最新永久免费视频 | 艾秋麻豆果冻传媒老狼仙踪林 | 天堂69亚洲精品中文字幕 | 香蕉97超级碰碰碰免费公 | 99国产在线视频 | 四虎免费看片 | 亚洲va在线va天堂va偷拍 | 日韩中文字幕视频在线观看 | 网红刘婷hd国产高清 | 色欲麻将| 久久国产视频网站 | 草莓丝瓜芭乐樱桃榴莲色多黄 |