一区二区三区在线-一区二区三区亚洲视频-一区二区三区亚洲-一区二区三区午夜-一区二区三区四区在线视频-一区二区三区四区在线免费观看

腳本之家,腳本語言編程技術(shù)及教程分享平臺(tái)!
分類導(dǎo)航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|

服務(wù)器之家 - 腳本之家 - Python - python實(shí)現(xiàn)NB-IoT模塊遠(yuǎn)程控制

python實(shí)現(xiàn)NB-IoT模塊遠(yuǎn)程控制

2021-03-07 00:09小爐灶 Python

這篇文章主要為大家詳細(xì)介紹了python實(shí)現(xiàn)NB-IoT模塊遠(yuǎn)程控制,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

本來想嘗試下如果不使用運(yùn)營(yíng)商網(wǎng)絡(luò)應(yīng)用平臺(tái)情況下,只是在服務(wù)商服務(wù)器上是否可以實(shí)現(xiàn)對(duì)終端完全控制,如果這樣可行,那么物聯(lián)網(wǎng)應(yīng)用服務(wù)端更有靈活性。實(shí)際情況下,很難實(shí)現(xiàn)和運(yùn)營(yíng)商網(wǎng)絡(luò)對(duì)等的處理,用python代碼原型確實(shí)能夠?qū)崿F(xiàn)參數(shù)的變化(如PSM,eDXR等),但是終端分配的IP地址畢竟屬于接入網(wǎng)部分,更近似一個(gè)局域網(wǎng),如果采用其他方式訪問(如IMSI、IMEI等),還是需要與運(yùn)營(yíng)商核心網(wǎng)進(jìn)行配合。以下是嘗試遠(yuǎn)程控制的實(shí)現(xiàn)方法。

主要實(shí)現(xiàn)功能

1、使用python pyserial模塊通過串口發(fā)送AT命令給模組進(jìn)行參數(shù)修改,參考<使用python pyserial模塊串口通信>;
2、通過inter網(wǎng)進(jìn)行控制命令傳輸,選用UDP進(jìn)行主機(jī)控制,參考<python socket網(wǎng)絡(luò)接口編程>;
3、直接通過NB-IoT無線網(wǎng)絡(luò)進(jìn)行控制命令的傳輸;
4、python多窗口處理服務(wù)器端程序,實(shí)現(xiàn)接收和發(fā)送同時(shí)進(jìn)行;

遠(yuǎn)程控制主機(jī)腳本

服務(wù)器端程序:監(jiān)測(cè)UDP對(duì)應(yīng)的端口號(hào),如果接收到register信息則返回allowed,然后進(jìn)入命令輸入狀態(tài),等待命令輸入完成,發(fā)送給終端,等待終端反饋,并接續(xù)下一個(gè)命令傳送。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/python3.6
import socket
import sys
import re
 
BUFFER_SIZE = 1024
TARGET_ADDR = ''
TARGET_PORT = 60000
TARGET = (TARGET_ADDR,TARGET_PORT)
 
ss = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
ss.bind(TARGET)
print("server online!! wait for register!")
 
data,addrRsv = ss.recvfrom(BUFFER_SIZE)
if not data:
  sys.exit(0)
else:
  print(data)
 
if(re.match(b'register',data)):
  ss.sendto(b'allowed',addrRsv)
else:
  ss.sendto(b'reject',addrRsv)
while True:
 #等待命令輸入
  aa = input('cmd > ')
  if not aa:
    break
  else:
    cmdV = aa+'\r'
    ss.sendto(cmdV.encode('utf-8'),addrRsv)
  #等待結(jié)果返回
  data,addrRsv = ss.recvfrom(BUFFER_SIZE)
  if not data:
    break
  else:
    print(data)
 
ss.close()

客戶主機(jī)程序:發(fā)送register并成功接收allowed后,等待控制命令,通過串口轉(zhuǎn)發(fā)給終端模塊,并接收終端模塊的反饋消息,返回給服務(wù)器側(cè)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#!/usr/bin/python3.6
import serial
import sys
import os
import re
import socket
 
#初始化UART端口
ser = serial.Serial("COM5",9600,timeout=30)
#選擇相應(yīng)的協(xié)議類型UDP
ss = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
BUFFER_SIZE = 1024
TARGET_ADDR = 'IP address'
TARGET_PORT = 60000
TARGET = (TARGET_ADDR,TARGET_PORT)
 
aa = '開機(jī)命令'.encode('utf-8') #convert to bytes type
ser.write(aa)
while True:
  line = ser.readline()
  if not line:
    print("can not get cmd result, release!")
    sys.exit(0)
  print(line)
  if ( re.match(b'OK',line) ):
    break
ss.sendto(bytes('register','utf-8'),TARGET)
data,addrRsv = ss.recvfrom(BUFFER_SIZE)
if re.match(b'allowed',data):
  print('register successfully!')
  pass
else:
  print('register failure')
  sys.exit(0)
 
while True:
  data,addrRsv = ss.recvfrom(BUFFER_SIZE)
  if not data:
    print("time out,release now!!")
    break
  elif re.match(b'end',data):
    print("end of process!!")
    break;
  ser.write(data)
   
  while True:
    line = ser.readline()
    if not line:
      print("can not get cmd result, release!")
      break
    print(line)
    if ( re.match(b'OK',line) ):
      ss.sendto(bytes('OK','utf-8'),TARGET)
      break
    elif(re.match(b'ERROR',line)):
      ss.sendto(bytes('ERROR','utf-8'),TARGET)
      break
    else:
      pass
 
ser.close()

多線程窗口

為了使得服務(wù)器端能夠?qū)崿F(xiàn)同時(shí)實(shí)現(xiàn)接收和發(fā)送,可以在服務(wù)器端開啟兩個(gè)窗口進(jìn)行監(jiān)聽,示例如下:

啟動(dòng)代碼

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/python3.6
import threading
import time
import subprocess
import os
import sys
 
def thread_fun1():
 #global vlock
 while(1):
  print("thread fun1 is running!!!")
  time.sleep(1)
 
#... ...
print(len(sys.argv))
#vlock = threading.Lock()
t1 = threading.Thread(target=thread_fun1,args=())
t1.start()
addr = 'IP address'
port = 60000
cmdStr = "python anotherThread.py %s %d"%(addr,port)
#設(shè)置creationflags = subprocess.CREATE_NEW_CONSOLE,用來創(chuàng)建新的控制臺(tái)窗口
subprocess.Popen(cmdStr,creationflags = subprocess.CREATE_NEW_CONSOLE)

anotherThread.py

?
1
2
3
4
5
6
7
8
9
10
#!/usr/bin/python3.6
 
def thread_fun2():
 while(1):
  aa = input('cmd > ')
  print("thread fun2 is running!!!")
  print(aa)
  if(aa == 'end'):
   break
thread_fun2()

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:https://blog.csdn.net/dreambitbybit/article/details/79307401

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: vod国产成人精品视频 | 免费精品99久久国产综合精品 | 久久er国产精品免费观看2 | 免费看www| 国产成人福利美女观看视频 | 日韩中文字幕在线不卡 | 扒开老女人 | 国产成人成人一区二区 | 男男羞羞视频网站国产 | 太紧太深了受不了黑人 | 俄罗斯一级淫片 | 色哟哟哟在线精品观看视频 | 二次元美女脱裤子让男人桶爽 | 77久久| 放荡的女老板bd中文字幕 | 艾秋果冻麻豆老狼 | 东京道一本热大交乱 | 日本高清中文字幕一区二区三区 | 色婷婷综合和线在线 | 波多 在线播放 | 国产实拍会所女技师在线 | 免费午夜网站 | 久久婷婷丁香五月色综合啪免费 | 美女靠逼的视频 | 日本不卡免免费观看 | 欧美ⅹxxxhd3d| 国产成人小视频 | 亚洲精品一二区 | 青草免费在线 | 国产小情侣自拍 | 日本在线观看视频 | 亚洲 欧美 另类 中文 在线 | 91麻豆精品国产自产在线观看 | gogort人体的最新网站 | 2018亚洲男人天堂 | 久久视频在线视频 | 2021日本三级理论影院 | 国产精品www | 四虎欧美 | 欧美一级xxxx俄罗斯一级 | 明星裸乳照无奶罩 |