本文實例為大家分享了python3實現(xiàn)ftp服務(wù)功能的具體代碼,供大家參考,具體內(nèi)容如下
功能介紹:
可執(zhí)行的命令:
ls
pwd
cd
put
rm
get
mkdir
1、用戶加密認證
2、允許多用戶同時登陸
3、每個用戶有自己的家目錄,且只可以訪問自己的家目錄
4、運行在自己家目錄下隨意切換目錄
5、允許上傳下載文件,且文件一致
6、傳輸過程中顯示進度條
server main 代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
|
# Author by Andy # _*_ coding:utf-8 _*_ import os, sys, json, hashlib, socketserver, time base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) sys.path.append(base_dir) from conf import userdb_set class Ftp_server(socketserver.BaseRequestHandler): user_home_dir = '' def auth( self , * args): '''驗證用戶名及密碼''' cmd_dic = args[ 0 ] username = cmd_dic[ "username" ] password = cmd_dic[ "password" ] f = open (userdb_set.userdb_set(), 'r' ) user_info = json.load(f) if username in user_info.keys(): if password = = user_info[username]: self .request.send( '0' .encode()) os.chdir( '/home/%s' % username) self .user_home_dir = os.popen( 'pwd' ).read().strip() data = "%s login successed" % username self .loging(data) else : self .request.send( '1' .encode()) data = "%s login failed" % username self .loging(data) f.close else : self .request.send( '1' .encode()) data = "%s login failed" % username self .loging(data) f.close ########################################## def get( self , * args): '''給客戶端傳輸文件''' request_code = { '0' : 'file is ready to get' , '1' : 'file not found!' } cmd_dic = args[ 0 ] self .loging(json.dumps(cmd_dic)) filename = cmd_dic[ "filename" ] if os.path.isfile(filename): self .request.send( '0' .encode( 'utf-8' )) # 確認文件存在 self .request.recv( 1024 ) self .request.send( str (os.stat(filename).st_size).encode( 'utf-8' )) self .request.recv( 1024 ) m = hashlib.md5() f = open (filename, 'rb' ) for line in f: m.update(line) self .request.send(line) self .request.send(m.hexdigest().encode( 'utf-8' )) print ( 'From server:Md5 value has been sended!' ) f.close() else : self .request.send( '1' .encode( 'utf-8' )) ########################################### def cd( self , * args): '''執(zhí)行cd命令''' user_current_dir = os.popen( 'pwd' ).read().strip() cmd_dic = args[ 0 ] self .loging(json.dumps(cmd_dic)) path = cmd_dic[ 'path' ] if path.startswith( '/' ): if self .user_home_dir in path: os.chdir(path) new_dir = os.popen( 'pwd' ).read() user_current_dir = new_dir self .request.send( 'Change dir successfully!' .encode( "utf-8" )) data = 'Change dir successfully!' self .loging(data) elif os.path.exists(path): self .request.send( 'Permission Denied!' .encode( "utf-8" )) data = 'Permission Denied!' self .loging(data) else : self .request.send( 'Directory not found!' .encode( "utf-8" )) data = 'Directory not found!' self .loging(data) elif os.path.exists(path): os.chdir(path) new_dir = os.popen( 'pwd' ).read().strip() if self .user_home_dir in new_dir: self .request.send( 'Change dir successfully!' .encode( "utf-8" )) user_current_dir = new_dir data = 'Change dir successfully!' self .loging(data) else : os.chdir(user_current_dir) self .request.send( 'Permission Denied!' .encode( "utf-8" )) data = 'Permission Denied!' self .loging(data) else : self .request.send( 'Directory not found!' .encode( "utf-8" )) data = 'Directory not found!' self .loging(data) ########################################### def rm( self , * args): request_code = { '0' : 'file exist,and Please confirm whether to rm' , '1' : 'file not found!' } cmd_dic = args[ 0 ] self .loging(json.dumps(cmd_dic)) filename = cmd_dic[ 'filename' ] if os.path.exists(filename): self .request.send( '0' .encode( "utf-8" )) # 確認文件存在 client_response = self .request.recv( 1024 ).decode() if client_response = = '0' : os.popen( 'rm -rf %s' % filename) self .request.send(( 'File %s has been deleted!' % filename).encode( "utf-8" )) self .loging( 'File %s has been deleted!' % filename) else : self .request.send(( 'File %s not deleted!' % filename).encode( "utf-8" )) self .loging( 'File %s not deleted!' % filename) else : self .request.send( '1' .encode( "utf-8" )) ######################################## def pwd( self , * args): '''執(zhí)行pwd命令''' cmd_dic = args[ 0 ] self .loging(json.dumps(cmd_dic)) server_response = os.popen( 'pwd' ).read().strip().encode( "utf-8" ) self .request.send(server_response) ############################################# def ls( self , * args): '''執(zhí)行l(wèi)s命名''' cmd_dic = args[ 0 ] self .loging(json.dumps(cmd_dic)) path = cmd_dic[ 'path' ] cmd = 'ls -l %s' % path server_response = os.popen(cmd).read().encode( "utf-8" ) self .request.send(server_response) ############################################ def put( self , * args): '''接收客戶端文件''' cmd_dic = args[ 0 ] self .loging(json.dumps(cmd_dic)) filename = cmd_dic[ "filename" ] filesize = cmd_dic[ "size" ] if os.path.isfile(filename): f = open (filename + '.new' , 'wb' ) else : f = open (filename, 'wb' ) request_code = { '200' : 'Ready to recceive data!' , '210' : 'Not ready to received data!' } self .request.send( '200' .encode()) receive_size = 0 while True : if receive_size < filesize: data = self .request.recv( 1024 ) f.write(data) receive_size + = len (data) else : data = "File %s has been uploaded successfully!" % filename self .loging(data) print (data) break ################################################ def mkdir( self , * args): request_code = { '0' : 'Directory has been made!' , '1' : 'Directory is aleady exist!' } cmd_dic = args[ 0 ] self .loging(json.dumps(cmd_dic)) dir_name = cmd_dic[ 'dir_name' ] if os.path.exists(dir_name): self .request.send( '1' .encode( "utf-8" )) else : os.popen( 'mkdir %s' % dir_name) self .request.send( '0' .encode( "utf-8" )) ############################################# def loging( self , data): '''日志記錄''' localtime = time.asctime(time.localtime(time.time())) log_file = '/root/ftp/ftpserver/log/server.log' with open (log_file, 'a' , encoding = 'utf-8' ) as f: f.write( '%s-->' % localtime + data + '\n' ) ############################################## def handle( self ): # print("您本次訪問使用的IP為:%s" %self.client_address[0]) # localtime = time.asctime( time.localtime(time.time())) # print(localtime) while True : try : self .data = self .request.recv( 1024 ).decode() # # print(self.data) cmd_dic = json.loads( self .data) action = cmd_dic[ "action" ] # print("用戶請求%s"%action) if hasattr ( self , action): func = getattr ( self , action) func(cmd_dic) except Exception as e: self .loging( str (e)) break def run(): HOST, PORT = '0.0.0.0' , 6969 print ( "The server is started,and listenning at port 6969" ) server = socketserver.ThreadingTCPServer((HOST, PORT), Ftp_server) server.serve_forever() if __name__ = = '__main__' : run() |
設(shè)置用戶口令代碼:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
#Author by Andy #_*_ coding:utf-8 _*_ import os,json,hashlib,sys base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) userdb_file = base_dir + "\data\\userdb" # print(userdb_file) def userdb_set(): if os.path.isfile(userdb_file): # print(userdb_file) return userdb_file else : print ( '請先為您的服務(wù)器創(chuàng)建用戶!' ) user_data = {} dict = {} Exit_flags = True while Exit_flags: username = input ( "Please input username:" ) if username ! = 'exit' : password = input ( "Please input passwod:" ) if password ! = 'exit' : user_data.update({username:password}) m = hashlib.md5() # m.update('hello') # print(m.hexdigest()) for i in user_data: # print(i,user_data[i]) m.update(user_data[i].encode()) dict .update({i:m.hexdigest()}) else : break else : break f = open (userdb_file, 'w' ) json.dump( dict ,f) f.close() return userdb_file |
目錄結(jié)構(gòu):
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。