项目目标
esp8266设备a读取mpu6050传感器数据,通过mqtt发送数据;
编写代码
main.py
- import network
- import time
- import json
- from umqtt.simple import mqttclient
- from machine import i2c
- from machine import pin
- import mpu6050
- device_id = "esp0007"
- wifi_ssid='fast_20cc'
- wifi_password='409409409'
- mqtt_serverip='192.168.1.113'
- mqtt_serverport=1883
- mqtt_clientid=device_id
- mqtt_publishtopic=b'espiot-control'
- message_template = {}
- message_template['source_device'] = mqtt_clientid
- message_template['target_device'] = 'server'
- message_template['msg_type'] = 'reportrotateencoder'
- def connectionwifi(ssid, password):
- wlan = network.wlan(network.sta_if)
- wlan.active(true)
- wlan.connect(ssid, password)
- while true:
- if not wlan.isconnected():
- print ("connecting...")
- else:
- print('connected to network')
- break
- time.sleep(1)
- if __name__== '__main__':
- connectionwifi(wifi_ssid,wifi_password)
-
- client = mqttclient(mqtt_clientid,mqtt_serverip,mqtt_serverport)
- client.connect()
-
- i2c = i2c(scl=pin(5), sda=pin(4)) #initializing the i2c method for esp8266
- mpu= mpu6050.accel(i2c)
- while true:
- mpu_data = mpu.get_values()
- print(mpu_data)
- message_template['mpu'] = mpu_data
- client.publish(mqtt_publishtopic,json.dumps(message_template))
- time.sleep_ms(100)
复制代码
- import machine
- class accel():
- def __init__(self, i2c, addr=0x68):
- self.iic = i2c
- self.addr = addr
- self.iic.start()
- self.iic.writeto(self.addr, bytearray([107, 0]))
- self.iic.stop()
- def get_raw_values(self):
- self.iic.start()
- a = self.iic.readfrom_mem(self.addr, 0x3b, 14)
- self.iic.stop()
- return a
- def get_ints(self):
- b = self.get_raw_values()
- c = []
- for i in b:
- c.append(i)
- return c
- def bytes_toint(self, firstbyte, secondbyte):
- if not firstbyte & 0x80:
- return firstbyte << 8 | secondbyte
- return - (((firstbyte ^ 255) << 8) | (secondbyte ^ 255) 1)
- def get_values(self):
- raw_ints = self.get_raw_values()
- vals = {}
- vals["acx"] = self.bytes_toint(raw_ints[0], raw_ints[1])
- vals["acy"] = self.bytes_toint(raw_ints[2], raw_ints[3])
- vals["acz"] = self.bytes_toint(raw_ints[4], raw_ints[5])
- vals["tmp"] = self.bytes_toint(raw_ints[6], raw_ints[7]) / 340.00 36.53
- vals["gyx"] = self.bytes_toint(raw_ints[8], raw_ints[9])
- vals["gyy"] = self.bytes_toint(raw_ints[10], raw_ints[11])
- vals["gyz"] = self.bytes_toint(raw_ints[12], raw_ints[13])
- return vals # returned in range of int16
- # -32768 to 32767
- def val_test(self): # only for testing! also, fast reading sometimes crashes iic
- from time import sleep
- while 1:
- print(self.get_values())
- sleep(0.05)
复制代码 项目截图
|