首先導入ftp模塊;
然后使用【chdir】命令切換工作路徑;
再使用“self.ftp.nlst()”命令獲取目錄下的文件;
最后使用“self.ftp.retrbinary()”命令下載ftp文件即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
#!/usr/bin/python # coding=utf-8 import os from ftplib import FTP # 引入ftp模塊 class MyFtp: ftp = FTP() def __init__( self ,host,port = 21 ): self .ftp.connect(host,port) def login( self ,username,pwd): self .ftp.set_debuglevel( 2 ) # 打開調試級別2,顯示詳細信息 self .ftp.login(username,pwd) p rint( self .ftp.welcome) def downloadFile( self ,localpath,remotepath,filename): os.chdir(localpath) # 切換工作路徑到下載目錄 self .ftp.cwd( remotepath) # 要登錄的ftp目錄 self .ftp.nlst() # 獲取目錄下的文件 file_handle = open (filename, "wb" ).write # 以寫模式在本地打開文件 self .ftp.retrbinary( 'RETR %s' % os.path.basename(filename),file_handle,blocksize = 1024 ) # 下載ftp文件 # ftp.delete(filename) # 刪除ftp服務器上的文件 def close( self ): self .ftp.set_debuglevel( 0 ) # 關閉調試 self .ftp.quit() if __name__ = = '__main__' : ftp = MyFtp( 'host' ) ftp.login( 'username' , 'pwd' ) ftp.downloadFile( 'E:\\RED\\workspace\\appAuto\\apk\\Android10' , '/mobile/Android/release10/' , 'xxx.apk' ) ftp.close() |
實例擴展:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
#coding=utf-8 ''' ftp自動下載、自動上傳腳本,可以遞歸目錄操作 ''' from ftplib import FTP import os,sys,string,datetime,time import socket class MYFTP: def __init__( self , hostaddr, username, password, remotedir, port = 21 ): self .hostaddr = hostaddr self .username = username self .password = password self .remotedir = remotedir self .port = port self .ftp = FTP() self .file_list = [] # self.ftp.set_debuglevel(2) def __del__( self ): self .ftp.close() # self.ftp.set_debuglevel(0) def login( self ): ftp = self .ftp try : timeout = 300 socket.setdefaulttimeout(timeout) ftp.set_pasv( True ) print u '開始連接到 %s' % ( self .hostaddr) ftp.connect( self .hostaddr, self .port) print u '成功連接到 %s' % ( self .hostaddr) print u '開始登錄到 %s' % ( self .hostaddr) ftp.login( self .username, self .password) print u '成功登錄到 %s' % ( self .hostaddr) debug_print(ftp.getwelcome()) except Exception: print u '連接或登錄失敗' try : ftp.cwd( self .remotedir) except (Exception): print u '切換目錄失敗' def is_same_size( self , localfile, remotefile): try : remotefile_size = self .ftp.size(remotefile) except : remotefile_size = - 1 try : localfile_size = os.path.getsize(localfile) except : localfile_size = - 1 debug_print( 'localfile_size:%d remotefile_size:%d' % (localfile_size, remotefile_size),) if remotefile_size = = localfile_size: return 1 else : return 0 def download_file( self , localfile, remotefile): if self .is_same_size(localfile, remotefile): debug_print(u '%s 文件大小相同,無需下載' % localfile) return else : debug_print(u '>>>>>>>>>>>>下載文件 %s ... ...' % localfile) #return file_handler = open (localfile, 'wb' ) self .ftp.retrbinary(u 'RETR %s' % (remotefile), file_handler.write) file_handler.close() def download_files( self , localdir = './' , remotedir = './' ): try : self .ftp.cwd(remotedir) except : debug_print(u '目錄%s不存在,繼續...' % remotedir) return if not os.path.isdir(localdir): os.makedirs(localdir) debug_print(u '切換至目錄 %s' % self .ftp.pwd()) self .file_list = [] self .ftp. dir ( self .get_file_list) remotenames = self .file_list #print(remotenames) #return for item in remotenames: filetype = item[ 0 ] filename = item[ 1 ] local = os.path.join(localdir, filename) if filetype = = 'd' : self .download_files(local, filename) elif filetype = = '-' : self .download_file(local, filename) self .ftp.cwd( '..' ) debug_print(u '返回上層目錄 %s' % self .ftp.pwd()) def upload_file( self , localfile, remotefile): if not os.path.isfile(localfile): return if self .is_same_size(localfile, remotefile): debug_print(u '跳過[相等]: %s' % localfile) return file_handler = open (localfile, 'rb' ) self .ftp.storbinary( 'STOR %s' % remotefile, file_handler) file_handler.close() debug_print(u '已傳送: %s' % localfile) def upload_files( self , localdir = './' , remotedir = './' ): if not os.path.isdir(localdir): return localnames = os.listdir(localdir) self .ftp.cwd(remotedir) for item in localnames: src = os.path.join(localdir, item) if os.path.isdir(src): try : self .ftp.mkd(item) except : debug_print(u '目錄已存在 %s' % item) self .upload_files(src, item) else : self .upload_file(src, item) self .ftp.cwd( '..' ) def get_file_list( self , line): ret_arr = [] file_arr = self .get_filename(line) if file_arr[ 1 ] not in [ '.' , '..' ]: self .file_list.append(file_arr) def get_filename( self , line): pos = line.rfind( ':' ) while (line[pos] ! = ' ' ): pos + = 1 while (line[pos] = = ' ' ): pos + = 1 file_arr = [line[ 0 ], line[pos:]] return file_arr def debug_print(s): print s if __name__ = = '__main__' : timenow = time.localtime() datenow = time.strftime( '%Y-%m-%d' , timenow) # 配置如下變量 hostaddr = '211.15.113.45' # ftp地址 username = 'UserName' # 用戶名 password = '123456' # 密碼 port = 21 # 端口號 rootdir_local = 'E:/mypiv' # 本地目錄 rootdir_remote = '/PIV' # 遠程目錄 f = MYFTP(hostaddr, username, password, rootdir_remote, port) f.login() f.download_files(rootdir_local, rootdir_remote) timenow = time.localtime() datenow = time.strftime( '%Y-%m-%d' , timenow) logstr = u "%s 成功執行了備份n" % datenow debug_print(logstr) |
到此這篇關于python實現從ftp上下載文件的實例方法的文章就介紹到這了,更多相關python怎么實現從ftp上下載文件內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!
原文鏈接:https://www.py.cn/jishu/gaoji/19626.html