步驟1:俄勒岡科學氣象站BLE協議
探索氣象站公開的BLE服務的一種簡單方法是在智能手機或平板電腦上使用BLE掃描儀應用程序。在這里,我將使用Android上的Nordic Semiconductor的nRF Connect。
對于不熟悉BLE的人們,您可能想開始使用此低功耗藍牙入門指南。這將幫助您了解GATT,服務和特性之類的術語。
借助BLE掃描器應用程序,您將找到周圍的所有BLE設備。并且您會看到俄勒岡科學氣象站正在將其自己廣告為 IDTW211R 。
如果連接到氣象站,您將獲得設備公開的服務列表
通用訪問UUID:0x1800
通用屬性UUID:0x1801
專有服務UUID:74e7fe00-c6a4-11e2-b7a9-0002a5d5c51b
設備信息UUID:0x180A
電池服務UUID:0x180F
對我們而言重要的服務是專有服務(UUID:74e7fe00-c6a4- 11E2-b7a9-0002a5d5c51b)。因此,我們單擊該服務并獲得該服務公開的特征列表。
我們的應用程序的相關特征是UUID:74e78e10-c6a4-11e2-b7a9-0002a5d5c51b(INDOOR_AND_CH1_TO_3_TH_DATA)。
如屬性中所示,此特征不可讀,但提供指示服務。而且我們將必須通過相應的客戶端特征配置描述符(UUID:0x2902)啟用指示。
事實證明,必須為設備的所有特征啟用指示/通知,才能開始獲取任何指示。
步驟2:將Raspberry Pi3連接到BLE氣象站
現在,我們對如何通過BLE檢索溫度數據有了更好的了解,讓我們嘗試讓Raspberry Pi與氣象站進行通訊。
Raspberry Pi上的BLE
我將使用Raspberry Pi3(集成了WLAN/BT控制器)和具有像素2016-09-23的Raspbian Jessie。
bluez (5.23)的版本此Raspbian版本已經過時了,我們必須升級到 bluez 的更高版本才能使用Bluetooth Low Energy。
下載 bluez 5.43 》并安裝所需的依賴項:
sudo apt-ge t update
sudo apt-get install -y libglib2.0-dev libdbus-1-dev libudev-dev libical-dev libreadline-dev
wget http://www.kernel .org/pub/linux/bluetooth/bluez-5.43.tar.xz
tar xvf bluez-5.43.tar.xz
配置,構建和安裝 bluez
。/configure
make
sudo make install
最后重新啟動Raspberry Pi。
現在,讓我們使用cmd hciconfig dev
驗證藍牙堆棧是否已啟動并且正在運行,然后確認我們可以掃描BLE設備: sudo hcitool lescan
我們應該看到氣象站的廣告為 IDTW211R 。
下一步是連接到氣象站:
sudo gatttool -b -t random -I
您應該獲得一個新的命令提示符,其BLE地址位于方括號之間。
connect
您應該獲得“連接成功”的提示,并變為彩色
連接后,我們可以運行一些命令以獲取有關設備的更多詳細信息。例如,主要命令將列出設備公開的服務。
從氣象站收集數據
如前所述,開始獲取有關在INDOOR_AND_CH1_TO_3_TH_DATA(UUID:74e78e10-c6a4-11e2-b7a9-0002a5d5c51b)中,我們必須啟用所有特征的指示/通知。
我們通過在客戶端特征配置描述符中寫入0x0002(用于通知的0x0001)來啟用指示。 (UUID:0x2902)每個特性。寫應該以小尾數形式放置,這樣: char-write-req 0200
作為回報,您應該開始獲得指示/通知。對我們而言,相關的是INDOOR_AND_CH1_TO_3_TH_DATA指示(因此句柄0x0017)。
指示句柄= 0x0017值:01 05 01 15 01 ff 7f ff 7f 7f 7f 7f 7f ff ff 7f 7f 7f 7f 7f 7f
指示句柄= 0x0017值:82 7f 7f 7f 21 01 f8 00 24 01 ba 00 ff 7f ff 7f ff 7f ff 7f
對于每一輪指示,我們得到兩個20的數據包每個字節。最高有效字節指示數據類型(類型0或類型1)。有關數據包的更多詳細信息,請參見上一張圖片。
步驟3:檢索氣象站數據的Python腳本
現在我們成功地從氣象站中獲取了數據,讓我們自動化了整個過程并理解了數據。
這是一個連接到氣象站,檢索數據并提取氣象站底座和隨附的無線傳感器的溫度。
此腳本利用bluepy庫提供了API,以允許從Python訪問Bluetooth Low Energy設備。因此,您必須在執行腳本之前安裝此模塊:https://github.com/IanHarvey/bluepy#installation
用法
可以使用MAC地址執行腳本
在后一種情況下,腳本將執行掃描并查找廣告為 IDTW211R 的設備。它必須以root特權執行,因為 bluez 需要 root 特權才能進行掃描操作。
python bleWeatherStation.py [mac-address]
sudo python bleWeatherStation.py
bleWeatherStation.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Connect to Oregon Scientific BLE Weather Station
# Copyright (c) 2016 Arnaud Balmelle
#
# This script will connect to Oregon Scientific BLE Weather Station
# and retrieve the temperature of the base and sensors attached to it.
# If no mac-address is passed as argument, it will scan for an Oregon Scientific BLE Weather Station.
#
# Supported Oregon Scientific Weather Station: EMR211 and RAR218HG (and probably BAR218HG)
#
# Usage: python bleWeatherStation.py [mac-address]
#
# Dependencies:
# - Bluetooth 4.1 and bluez installed
# - bluepy library (https://github.com/IanHarvey/bluepy)
#
# License: Released under an MIT license: http://opensource.org/licenses/MIT
import sys
import logging
import time
import sqlite3
from bluepy.btle import *
# uncomment the following line to get debug information
logging.basicConfig(format=‘%(asctime)s: %(message)s’, level=logging.DEBUG)
WEATHERSTATION_NAME = “IDTW211R” # IDTW213R for RAR218HG
class WeatherStation:
def __init__(self, mac):
self._data = {}
try:
self.p = Peripheral(mac, ADDR_TYPE_RANDOM)
self.p.setDelegate(NotificationDelegate())
logging.debug(‘WeatherStation connected !’)
except BTLEException:
self.p = 0
logging.debug(‘Connection to WeatherStation failed !’)
raise
def _enableNotification(self):
try:
# Enable all notification or indication
self.p.writeCharacteristic(0x000c, “x02x00”)
self.p.writeCharacteristic(0x000f, “x02x00”)
self.p.writeCharacteristic(0x0012, “x02x00”)
self.p.writeCharacteristic(0x0015, “x01x00”)
self.p.writeCharacteristic(0x0018, “x02x00”)
self.p.writeCharacteristic(0x001b, “x02x00”)
self.p.writeCharacteristic(0x001e, “x02x00”)
self.p.writeCharacteristic(0x0021, “x02x00”)
self.p.writeCharacteristic(0x0032, “x01x00”)
logging.debug(‘Notifications enabled’)
except BTLEException as err:
print(err)
self.p.disconnect()
def monitorWeatherStation(self):
try:
# Enable notification
self._enableNotification()
# Wait for notifications
while self.p.waitForNotifications(1.0):
# handleNotification() was called
continue
logging.debug(‘Notification timeout’)
except:
return None
regs = self.p.delegate.getData()
if regs is not None:
# expand INDOOR_AND_CH1_TO_3_TH_DATA_TYPE0
self._data[‘index0_temperature’] = ‘’.join(regs[‘data_type0’][4:6] + regs[‘data_type0’][2:4])
self._data[‘index1_temperature’] = ‘’.join(regs[‘data_type0’][8:10] + regs[‘data_type0’][6:8])
self._data[‘index2_temperature’] = ‘’.join(regs[‘data_type0’][12:14] + regs[‘data_type0’][10:12])
self._data[‘index3_temperature’] = ‘’.join(regs[‘data_type0’][16:18] + regs[‘data_type0’][14:16])
self._data[‘index0_humidity’] = regs[‘data_type0’][18:20]
self._data[‘index1_humidity’] = regs[‘data_type0’][20:22]
self._data[‘index2_humidity’] = regs[‘data_type0’][22:24]
self._data[‘index3_humidity’] = regs[‘data_type0’][24:26]
self._data[‘temperature_trend’] = regs[‘data_type0’][26:28]
self._data[‘humidity_trend’] = regs[‘data_type0’][28:30]
self._data[‘index0_humidity_max’] = regs[‘data_type0’][30:32]
self._data[‘index0_humidity_min’] = regs[‘data_type0’][32:34]
self._data[‘index1_humidity_max’] = regs[‘data_type0’][34:36]
self._data[‘index1_humidity_min’] = regs[‘data_type0’][36:38]
self._data[‘index2_humidity_max’] = regs[‘data_type0’][38:40]
# expand INDOOR_AND_CH1_TO_3_TH_DATA_TYPE1
self._data[‘index2_humidity_min’] = regs[‘data_type1’][2:4]
self._data[‘index3_humidity_max’] = regs[‘data_type1’][4:6]
self._data[‘index3_humidity_min’] = regs[‘data_type1’][6:8]
self._data[‘index0_temperature_max’] = ‘’.join(regs[‘data_type1’][10:12] + regs[‘data_type1’][8:10])
self._data[‘index0_temperature_min’] = ‘’.join(regs[‘data_type1’][14:16] + regs[‘data_type1’][12:14])
self._data[‘index1_temperature_max’] = ‘’.join(regs[‘data_type1’][18:20] + regs[‘data_type1’][16:18])
self._data[‘index1_temperature_min’] = ‘’.join(regs[‘data_type1’][22:24] + regs[‘data_type1’][20:22])
self._data[‘index2_temperature_max’] = ‘’.join(regs[‘data_type1’][26:28] + regs[‘data_type1’][24:26])
self._data[‘index2_temperature_min’] = ‘’.join(regs[‘data_type1’][30:32] + regs[‘data_type1’][28:30])
self._data[‘index3_temperature_max’] = ‘’.join(regs[‘data_type1’][34:36] + regs[‘data_type1’][32:34])
self._data[‘index3_temperature_min’] = ‘’.join(regs[‘data_type1’][38:40] + regs[‘data_type1’][36:38])
return True
else:
return None
def getValue(self, indexstr):
val = int(self._data[indexstr], 16)
if val 》= 0x8000:
val = ((val + 0x8000) & 0xFFFF) - 0x8000
return val
def getIndoorTemp(self):
if ‘index0_temperature’ in self._data:
temp = self.getValue(‘index0_temperature’) / 10.0
max = self.getValue(‘index0_temperature_max’) / 10.0
min = self.getValue(‘index0_temperature_min’) / 10.0
logging.debug(‘Indoor temp : %.1f°C, max : %.1f°C, min : %.1f°C’, temp, max, min)
return temp
else:
return None
def getOutdoorTemp(self):
if ‘index1_temperature’ in self._data:
temp = self.getValue(‘index1_temperature’) / 10.0
max = self.getValue(‘index1_temperature_max’) / 10.0
min = self.getValue(‘index1_temperature_min’) / 10.0
logging.debug(‘Outdoor temp : %.1f°C, max : %.1f°C, min : %.1f°C’, temp, max, min)
return temp
else:
return None
def disconnect(self):
self.p.disconnect()
class NotificationDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
self._indoorAndOutdoorTemp_type0 = None
self._indoorAndOutdoorTemp_type1 = None
def handleNotification(self, cHandle, data):
formatedData = binascii.b2a_hex(data)
if cHandle == 0x0017:
# indoorAndOutdoorTemp indication received
if formatedData[0] == ‘8’:
# Type1 data packet received
self._indoorAndOutdoorTemp_type1 = formatedData
logging.debug(‘indoorAndOutdoorTemp_type1 = %s’, formatedData)
else:
# Type0 data packet received
self._indoorAndOutdoorTemp_type0 = formatedData
logging.debug(‘indoorAndOutdoorTemp_type0 = %s’, formatedData)
else:
# skip other indications/notifications
logging.debug(‘handle %x = %s’, cHandle, formatedData)
def getData(self):
if self._indoorAndOutdoorTemp_type0 is not None:
# return sensors data
return {‘data_type0’:self._indoorAndOutdoorTemp_type0, ‘data_type1’:self._indoorAndOutdoorTemp_type1}
else:
return None
class ScanDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
def handleDiscovery(self, dev, isNewDev, isNewData):
global weatherStationMacAddr
if dev.getValueText(9) == WEATHERSTATION_NAME:
# Weather Station in range, saving Mac address for future connection
logging.debug(‘WeatherStation found’)
weatherStationMacAddr = dev.addr
if __name__==“__main__”:
weatherStationMacAddr = None
if len(sys.argv) 《 2:
# No MAC address passed as argument
try:
# Scanning to see if Weather Station in range
scanner = Scanner().withDelegate(ScanDelegate())
devices = scanner.scan(2.0)
except BTLEException as err:
print(err)
print(‘Scanning required root privilege, so do not forget to run the script with sudo.’)
else:
# Weather Station MAC address passed as argument, will attempt to connect with this address
weatherStationMacAddr = sys.argv[1]
if weatherStationMacAddr is None:
logging.debug(‘No WeatherStation in range !’)
else:
try:
# Attempting to connect to device with MAC address “weatherStationMacAddr”
weatherStation = WeatherStation(weatherStationMacAddr)
if weatherStation.monitorWeatherStation() is not None:
# WeatherStation data received
indoor = weatherStation.getIndoorTemp()
outdoor = weatherStation.getOutdoorTemp()
else:
logging.debug(‘No data received from WeatherStation’)
weatherStation.disconnect()
except KeyboardInterrupt:
logging.debug(‘Program stopped by user’)
-
氣象站
+關注
關注
1文章
753瀏覽量
15692 -
樹莓派
+關注
關注
116文章
1708瀏覽量
105695
發布評論請先 登錄
相關推薦
評論