利用ESP32 + micropython +继电器做一个手机控制的物联网开关

post-thumb

先理一下思路和原理

接线图:

micropython   main.py 程序代码:

from umqtt.simple import MQTTClient
from machine import SoftI2C, Pin
from machine import Timer
from ssd1306 import SSD1306_I2C
from font import Font
from scron.week import simple_cron
from scron.decorators import run_times, call_counter, time_since_last_call
import _thread
import ubinascii
import network
import time
import random
import json
import bluetooth
import machine

scl_pin = Pin(7)  # 或 Pin.OUT, Pin.LOW,具体取决于你的需求
sda_pin = Pin(6)


try:
    i2c = SoftI2C(scl = scl_pin,sda = sda_pin,freq = 10000)


    # 扫描 I2C 设备
    devices = i2c.scan()
    if not devices:
        print("No I2C devices found")
    else:
        print("I2C devices found:", devices)


    oled = SSD1306_I2C(128,64,i2c)

    print("SSD1306加载正确")
    def dispinfo(text1, text2, text3):
        f=Font(oled)
        oled.fill(0)
        f.text(text1,0,0,16)
        f.text(text2,0,16,16)
        f.text(text3,0,32,16)
        f.show()
        print(text1 + " , " + text2 + " , " + text3)
    def dispinfo2(text3):
        print(text3)
        f=Font(oled)
        f.text(text3,0,32,16)
        f.show()
    def dispinfo3(text4):
        print(text4)
        f=Font(oled)
        f.text(text4,0,48,16)
        f.show()
except Exception as e:
    print("An error occurred:", e)
    print("SSD1306错误")
    def dispinfo(text1, text2, text3):
        print(text1 + " , " + text2 + " , " + text3)
    def dispinfo2(text3):
        print(text3)
    def dispinfo3(text4):
        print(text4)

BLE_MSG = ""
dispinfo("now booting...", "", "")
client_id = "slim_id"
mserver = '127.0.0.1' #mqtt服务器ip  
port = 1883   #mqtt服务器port
mqttu = "mqtt"  #mqtt服务器ip  
mqttp = "mqtt"  #mqtt服务器ip  
deviceID = "esp32c3_1234"
topic_ctl = b'/dev/' + deviceID + '/ctl'  # 设备订阅的主题,客户端推送消息的主题
topic_sta = b'/dev/' + deviceID + '/sta'  # 客户端订阅的主题,设备推送消息的主题
client = None
wlan = None
wlanip = ""

Mac = ""

#led2 = Pin(12, Pin.OUT, value=0)
led1 = Pin(13, Pin.OUT, value=0)
relay = Pin(10, Pin.OUT, value=0)


def sub_callback(topic, msg):
    global wlanip
    global client
    print((topic_ctl, msg))
    if msg == b'1,0,0,0':
        pub_msg = '0,0,0,0,' + wlanip
        led1.value(0)
        #led2.value(0)
        relay.value(0)
        dispinfo2("Set relay close")
    elif msg == b'1,1,1,1':
        pub_msg = '0,1,1,1,' + wlanip
        led1.value(1)
        #led2.value(1)
        relay.value(1)
        dispinfo2("Set relay open ")
    else:
        pub_msg = '0'
    client.publish(topic_sta, pub_msg, retain=True)


def reset_wifi():
    wlan = network.WLAN(network.STA_IF)
    if wlan.isconnected():
        wlan.disconnect()  # 断开当前连接
    wlan.active(False)     # 禁用 Wi-Fi
    wlan.active(True)      # 重新启用 Wi-Fi
def wifi_connect(ssid, pwd):
    global wlanip
    global Mac
    reset_wifi()
    print("开始连接wifi  ssid:" + ssid + "  pwd::" + pwd)
    station = network.WLAN(network.STA_IF)  # 配置WiFi的STA模式
    station.active(True)  # 开启esp32的WiFi
    station.connect(ssid, pwd)  # 进行WiFi连接
    count = 0
    while station.isconnected() == False:  # 检测是否连接成功
        count += 1
        time.sleep(1)
        if count > 8:  # 如果检测到大于8次没有连接成功(8秒后还没有联网)关闭WiFi并返回
            station.active(False)
            return 0
    print("Connection successful" + station.ifconfig()[0])
    wlanip = station.ifconfig()[0]
    Mac = network.WLAN().config('mac')
    print(station.ifconfig())
    led = Pin(12, Pin.OUT)  # 连接成功后配置LED引脚为输出模式,让LED灯长亮
    led.value(1)
    return 1

hearts=0
def cron_task(scorn_instance, callback_name, pointer, memory):
    if 'counter' in memory:
        memory['counter'] += 1
    else:
        memory['counter'] = 1
    # 心跳  0,0,0,0,wlanip 的格式   第1个 0是心跳包  2 3 4为引脚状态
    client.publish(topic_sta,
                   "0," + str(led1.value()) + ",0," + str(relay.value()) + "," + wlanip,
                   retain=True)
    global hearts
    if hearts<99999:
        hearts=hearts + 1
    else:
        hearts=0
    dispinfo3("HEARTBEAT:" + str(hearts))



class ESP32_BLE():
    def __init__(self, name):
        self.led = Pin(12, Pin.OUT)  # 配置LED灯引脚为输出模式
        self.timer1 = Timer(0)  # 配置定时器
        self.name = name
        self.ble = bluetooth.BLE()  # 创建蓝牙对象
        self.ble.active(True)  # 开启蓝牙
        self.ble.config(gap_name=name)  # 配置蓝牙信息
        self.disconnected()  # 设置定时器中断
        self.ble.irq(self.ble_irq)  # 蓝牙时间处理
        self.register()  # 配置蓝牙的uuid
        self.ble.gatts_write(self.rx, bytes(100))  # 默认蓝牙只接收20字节,这里更改为接收100字节
        self.advertiser()  # 蓝牙广播
        self.ok = 0
        self.BLE_MSG = ""
    def disconnected(self):
        self.timer1.init(period=100, mode=Timer.PERIODIC, callback=lambda t: self.led.value(not self.led.value()))

    def register(self):
        service_uuid = '6E400001-B5A3-F393-E0A9-E50E24DCCA9E'
        reader_uuid = '6E400002-B5A3-F393-E0A9-E50E24DCCA9E'
        sender_uuid = '6E400003-B5A3-F393-E0A9-E50E24DCCA9E'

        services = (
            (
                bluetooth.UUID(service_uuid),
                (
                    (bluetooth.UUID(sender_uuid), bluetooth.FLAG_NOTIFY),
                    (bluetooth.UUID(reader_uuid), bluetooth.FLAG_WRITE),
                )
            ),
        )

        ((self.tx, self.rx,),) = self.ble.gatts_register_services(services)

    def advertiser(self):
        name = bytes(self.name, 'UTF-8')
        adv_data = bytearray(b'\x02\x01\x02') + bytearray((len(name) + 1, 0x09)) + name
        self.ble.gap_advertise(100, adv_data)

    def ble_irq(self, event, data):
        global BLE_MSG
        if event == 1:  # 手机连接了此设备
            # dispinfo("Ble Connected","","")
            self.connected()
        elif event == 2:  # 手机断开此设备
            if self.ok == 0:
                # dispinfo("Ble Disconnected","","")
                self.advertiser()
                self.disconnected()
        elif event == 3:  # 手机发送了数据
            buffer = self.ble.gatts_read(self.rx)
            BLE_MSG = buffer.decode('UTF-8').strip()
            self.wifi_get_data()
            #_thread.start_new_thread(self.process_ble_data, ())  # 使用线程处理数据



    def connected(self):
        self.timer1.deinit()
        self.led.value(0)


    def wifi_get_data(self):
        if len(BLE_MSG) > 0:
            ssid, password = BLE_MSG.split('|')  # 分离信息

            config = dict(ssid=ssid, password=password)
            with open('wifi_config', 'w+') as f:  # 将字典转成json写入wifi_config文件中
                f.write(json.dumps(config))
            machine.reset() #重启


connected = False  #

def runmqtt(wlanip, mac):
    global topic_ctl
    global topic_sta
    global client_id
    global mserver
    global port
    global mqttu
    global mqttp
    global deviceID
    global client
    #print("wifi ok")
    wifi_connect_flag = 1
    #print('连接wifi成功' + wlanip)

    deviceID = "esp32c3_" + ubinascii.hexlify(mac, ':').decode()
    topic_ctl = b'/dev/' + deviceID + '/ctl'  # 设备订阅的主题,客户端推送消息的主题
    topic_sta = b'/dev/' + deviceID + '/sta'  # 客户端订阅的主题,设备推送消息的主题

    dispinfo("Wifi connection", "successful", "W" + wlanip)
    client = MQTTClient(client_id + str(random.randint(1000, 9999)), mserver, port, mqttu, mqttp)
    client.set_callback(sub_callback)
    client.connect()
    client.subscribe(topic_ctl)
    client.publish(topic_sta, deviceID + ' online', retain=True)
    print("Connected to %s, subscribed to %s topic" % (mserver, topic_ctl))
    dispinfo("W" + wlanip, "S" + mserver, "MqttConnected")

    simple_cron.add('every 60 seconds', cron_task, seconds=range(0, 59, 60))
    simple_cron.run()
    while True:
        client.wait_msg()


try:
    with open('wifi_config', 'r+') as f:  # 尝试打开wifi_config文件
        config = json.loads(f.read())  # 获取wifi_config的信息

    print("ssid::" + config['ssid'])
    print("password::" + config['password'])
    wifi_connect_ok = wifi_connect(config['ssid'], config['password'])  # 进行WiFi连接
    if wifi_connect_ok:
        runmqtt(wlanip, network.WLAN().config('mac'))
    else:
        print("wifi 错误")
        dispinfo("Wifi Notlink", "Start BleServer", "format ssid|pass")
        ble = ESP32_BLE("WuTong蓝牙配网")  # 如果连接失败则进入蓝牙配网模式
except OSError:
    print("wifi 错误")
    dispinfo("Wifi Notlink", "Start BleServer", "format ssid|pass")
    ble = ESP32_BLE("WuTong蓝牙配网")  #如果查找不到文件则进入蓝牙配网模式

 

后端GO 程序代码

待更新....