利用ESP32 + micropython +继电器做一个手机控制的物联网开关
先理一下思路和原理
接线图:
micropython main.py 程序代码:
from umqtt.simple import MQTTClient
from machine import SoftI2C, Pin
from machine import Timer
from ssd1306 import SSD1306_I2C
from font import Font
from scron.week import simple_cron
from scron.decorators import run_times, call_counter, time_since_last_call
import _thread
import ubinascii
import network
import time
import random
import json
import bluetooth
import machine
scl_pin = Pin(7) # 或 Pin.OUT, Pin.LOW,具体取决于你的需求
sda_pin = Pin(6)
try:
i2c = SoftI2C(scl = scl_pin,sda = sda_pin,freq = 10000)
# 扫描 I2C 设备
devices = i2c.scan()
if not devices:
print("No I2C devices found")
else:
print("I2C devices found:", devices)
oled = SSD1306_I2C(128,64,i2c)
print("SSD1306加载正确")
def dispinfo(text1, text2, text3):
f=Font(oled)
oled.fill(0)
f.text(text1,0,0,16)
f.text(text2,0,16,16)
f.text(text3,0,32,16)
f.show()
print(text1 + " , " + text2 + " , " + text3)
def dispinfo2(text3):
print(text3)
f=Font(oled)
f.text(text3,0,32,16)
f.show()
def dispinfo3(text4):
print(text4)
f=Font(oled)
f.text(text4,0,48,16)
f.show()
except Exception as e:
print("An error occurred:", e)
print("SSD1306错误")
def dispinfo(text1, text2, text3):
print(text1 + " , " + text2 + " , " + text3)
def dispinfo2(text3):
print(text3)
def dispinfo3(text4):
print(text4)
BLE_MSG = ""
dispinfo("now booting...", "", "")
client_id = "slim_id"
mserver = '127.0.0.1' #mqtt服务器ip
port = 1883 #mqtt服务器port
mqttu = "mqtt" #mqtt服务器ip
mqttp = "mqtt" #mqtt服务器ip
deviceID = "esp32c3_1234"
topic_ctl = b'/dev/' + deviceID + '/ctl' # 设备订阅的主题,客户端推送消息的主题
topic_sta = b'/dev/' + deviceID + '/sta' # 客户端订阅的主题,设备推送消息的主题
client = None
wlan = None
wlanip = ""
Mac = ""
#led2 = Pin(12, Pin.OUT, value=0)
led1 = Pin(13, Pin.OUT, value=0)
relay = Pin(10, Pin.OUT, value=0)
def sub_callback(topic, msg):
global wlanip
global client
print((topic_ctl, msg))
if msg == b'1,0,0,0':
pub_msg = '0,0,0,0,' + wlanip
led1.value(0)
#led2.value(0)
relay.value(0)
dispinfo2("Set relay close")
elif msg == b'1,1,1,1':
pub_msg = '0,1,1,1,' + wlanip
led1.value(1)
#led2.value(1)
relay.value(1)
dispinfo2("Set relay open ")
else:
pub_msg = '0'
client.publish(topic_sta, pub_msg, retain=True)
def reset_wifi():
wlan = network.WLAN(network.STA_IF)
if wlan.isconnected():
wlan.disconnect() # 断开当前连接
wlan.active(False) # 禁用 Wi-Fi
wlan.active(True) # 重新启用 Wi-Fi
def wifi_connect(ssid, pwd):
global wlanip
global Mac
reset_wifi()
print("开始连接wifi ssid:" + ssid + " pwd::" + pwd)
station = network.WLAN(network.STA_IF) # 配置WiFi的STA模式
station.active(True) # 开启esp32的WiFi
station.connect(ssid, pwd) # 进行WiFi连接
count = 0
while station.isconnected() == False: # 检测是否连接成功
count += 1
time.sleep(1)
if count > 8: # 如果检测到大于8次没有连接成功(8秒后还没有联网)关闭WiFi并返回
station.active(False)
return 0
print("Connection successful" + station.ifconfig()[0])
wlanip = station.ifconfig()[0]
Mac = network.WLAN().config('mac')
print(station.ifconfig())
led = Pin(12, Pin.OUT) # 连接成功后配置LED引脚为输出模式,让LED灯长亮
led.value(1)
return 1
hearts=0
def cron_task(scorn_instance, callback_name, pointer, memory):
if 'counter' in memory:
memory['counter'] += 1
else:
memory['counter'] = 1
# 心跳 0,0,0,0,wlanip 的格式 第1个 0是心跳包 2 3 4为引脚状态
client.publish(topic_sta,
"0," + str(led1.value()) + ",0," + str(relay.value()) + "," + wlanip,
retain=True)
global hearts
if hearts<99999:
hearts=hearts + 1
else:
hearts=0
dispinfo3("HEARTBEAT:" + str(hearts))
class ESP32_BLE():
def __init__(self, name):
self.led = Pin(12, Pin.OUT) # 配置LED灯引脚为输出模式
self.timer1 = Timer(0) # 配置定时器
self.name = name
self.ble = bluetooth.BLE() # 创建蓝牙对象
self.ble.active(True) # 开启蓝牙
self.ble.config(gap_name=name) # 配置蓝牙信息
self.disconnected() # 设置定时器中断
self.ble.irq(self.ble_irq) # 蓝牙时间处理
self.register() # 配置蓝牙的uuid
self.ble.gatts_write(self.rx, bytes(100)) # 默认蓝牙只接收20字节,这里更改为接收100字节
self.advertiser() # 蓝牙广播
self.ok = 0
self.BLE_MSG = ""
def disconnected(self):
self.timer1.init(period=100, mode=Timer.PERIODIC, callback=lambda t: self.led.value(not self.led.value()))
def register(self):
service_uuid = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
reader_uuid = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
sender_uuid = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'
services = (
(
bluetooth.UUID(service_uuid),
(
(bluetooth.UUID(sender_uuid), bluetooth.FLAG_NOTIFY),
(bluetooth.UUID(reader_uuid), bluetooth.FLAG_WRITE),
)
),
)
((self.tx, self.rx,),) = self.ble.gatts_register_services(services)
def advertiser(self):
name = bytes(self.name, 'UTF-8')
adv_data = bytearray(b'\x02\x01\x02') + bytearray((len(name) + 1, 0x09)) + name
self.ble.gap_advertise(100, adv_data)
def ble_irq(self, event, data):
global BLE_MSG
if event == 1: # 手机连接了此设备
# dispinfo("Ble Connected","","")
self.connected()
elif event == 2: # 手机断开此设备
if self.ok == 0:
# dispinfo("Ble Disconnected","","")
self.advertiser()
self.disconnected()
elif event == 3: # 手机发送了数据
buffer = self.ble.gatts_read(self.rx)
BLE_MSG = buffer.decode('UTF-8').strip()
self.wifi_get_data()
#_thread.start_new_thread(self.process_ble_data, ()) # 使用线程处理数据
def connected(self):
self.timer1.deinit()
self.led.value(0)
def wifi_get_data(self):
if len(BLE_MSG) > 0:
ssid, password = BLE_MSG.split('|') # 分离信息
config = dict(ssid=ssid, password=password)
with open('wifi_config', 'w+') as f: # 将字典转成json写入wifi_config文件中
f.write(json.dumps(config))
machine.reset() #重启
connected = False #
def runmqtt(wlanip, mac):
global topic_ctl
global topic_sta
global client_id
global mserver
global port
global mqttu
global mqttp
global deviceID
global client
#print("wifi ok")
wifi_connect_flag = 1
#print('连接wifi成功' + wlanip)
deviceID = "esp32c3_" + ubinascii.hexlify(mac, ':').decode()
topic_ctl = b'/dev/' + deviceID + '/ctl' # 设备订阅的主题,客户端推送消息的主题
topic_sta = b'/dev/' + deviceID + '/sta' # 客户端订阅的主题,设备推送消息的主题
dispinfo("Wifi connection", "successful", "W" + wlanip)
client = MQTTClient(client_id + str(random.randint(1000, 9999)), mserver, port, mqttu, mqttp)
client.set_callback(sub_callback)
client.connect()
client.subscribe(topic_ctl)
client.publish(topic_sta, deviceID + ' online', retain=True)
print("Connected to %s, subscribed to %s topic" % (mserver, topic_ctl))
dispinfo("W" + wlanip, "S" + mserver, "MqttConnected")
simple_cron.add('every 60 seconds', cron_task, seconds=range(0, 59, 60))
simple_cron.run()
while True:
client.wait_msg()
try:
with open('wifi_config', 'r+') as f: # 尝试打开wifi_config文件
config = json.loads(f.read()) # 获取wifi_config的信息
print("ssid::" + config['ssid'])
print("password::" + config['password'])
wifi_connect_ok = wifi_connect(config['ssid'], config['password']) # 进行WiFi连接
if wifi_connect_ok:
runmqtt(wlanip, network.WLAN().config('mac'))
else:
print("wifi 错误")
dispinfo("Wifi Notlink", "Start BleServer", "format ssid|pass")
ble = ESP32_BLE("WuTong蓝牙配网") # 如果连接失败则进入蓝牙配网模式
except OSError:
print("wifi 错误")
dispinfo("Wifi Notlink", "Start BleServer", "format ssid|pass")
ble = ESP32_BLE("WuTong蓝牙配网") #如果查找不到文件则进入蓝牙配网模式
后端GO 程序代码
待更新....