Files
zeroPLC-installer/zeroPLCgui/gui.py
CarstenArend aac69b38dd initial commit
2025-09-03 12:23:27 +02:00

389 lines
12 KiB
Python

#!/usr/bin/env bash
'''exec' "$HOME/zeroPLCgui/.venv/bin/python3" "$0" "$@"
' '''
from nicegui import ui,app
import os,sys
import gpiod
import atexit
import board
import adafruit_tmp117
import adafruit_sht4x
from adafruit_extended_bus import ExtendedI2C as I2C
from picamera2 import Picamera2
from fastapi.responses import StreamingResponse
import io
from PIL import Image
import time
picam2 = None
camera_connected = 0
def init_camera():
global picam2
picam2 = Picamera2()
picam2.configure(picam2.create_preview_configuration(main={"format": 'XRGB8888', "size": (640, 480)}))
picam2.start()
def generate_mjpeg():
while True:
img = picam2.capture_array()
pil_img = Image.fromarray(img)
if pil_img.mode == 'RGBA':
pil_img = pil_img.convert('RGB')
buf = io.BytesIO()
pil_img.save(buf, format='JPEG')
frame = buf.getvalue()
yield (
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
)
time.sleep(0.1)
@app.get('/video_feed')
def video_feed():
return StreamingResponse(generate_mjpeg(), media_type='multipart/x-mixed-replace; boundary=frame')
i2c = board.I2C() # uses board.SCL and board.SDA
tmp117 = adafruit_tmp117.TMP117(i2c)
i2c3=I2C(3)
try:
sht = adafruit_sht4x.SHT4x(i2c3,0x46)
except:
sht = 0
# Do not request the line upfront; request and release on demand
chip = gpiod.Chip('gpiochip0')
chip2 = gpiod.Chip('gpiochip2')
output1 = chip.get_line(12)
output2 = chip.get_line(13)
output3 = chip.get_line(18)
output4 = chip.get_line(19)
output5 = chip.get_line(20)
output6 = chip.get_line(21)
output7 = chip.get_line(22)
output8 = chip.get_line(26)
input1 = chip.get_line(5)
input2 = chip.get_line(6)
input3 = chip.get_line(7)
input4 = chip.get_line(8)
input5 = chip.get_line(27)
input6 = chip.get_line(25)
input7 = chip2.get_line(9)
input8 = chip2.get_line(10)
def set_output1():
try:
output1.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT)
output1.set_value(switch_1.value)
output1.release()
except OSError as e:
print(f"Output1 error: {e}")
def set_output2():
try:
output2.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT)
output2.set_value(switch_2.value)
output2.release()
except OSError as e:
print(f"Output2 error: {e}")
def set_output3():
try:
output3.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT)
output3.set_value(switch_3.value)
output3.release()
except OSError as e:
print(f"Output3 error: {e}")
def set_output4():
try:
output4.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT)
output4.set_value(switch_4.value)
output4.release()
except OSError as e:
print(f"Output4 error: {e}")
def set_output5():
try:
output5.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT)
output5.set_value(switch_5.value)
output5.release()
except OSError as e:
print(f"Output5 error: {e}")
def set_output6():
try:
output6.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT)
output6.set_value(switch_6.value)
output6.release()
except OSError as e:
print(f"Output6 error: {e}")
def set_output7():
try:
output7.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT)
output7.set_value(switch_7.value)
output7.release()
except OSError as e:
print(f"Output7 error: {e}")
def set_output8():
try:
output8.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT)
output8.set_value(switch_8.value)
output8.release()
except OSError as e:
print(f"Output8 error: {e}")
def get_input1():
try:
input1.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN)
if input1.get_value():
input1_indicator.classes('text-green', remove='text-red')
else:
input1_indicator.classes('text-red', remove='text-green')
input1.release()
except OSError as e:
print(f"Input1 error: {e}")
def get_input2():
try:
input2.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN)
if input2.get_value():
input2_indicator.classes('text-green', remove='text-red')
else:
input2_indicator.classes('text-red', remove='text-green')
input2.release()
except OSError as e:
print(f"Input2 error: {e}")
def get_input3():
try:
input3.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN)
if input3.get_value():
input3_indicator.classes('text-green', remove='text-red')
else:
input3_indicator.classes('text-red', remove='text-green')
input3.release()
except OSError as e:
print(f"Input3 error: {e}")
def get_input4():
try:
input4.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN)
if input4.get_value():
input4_indicator.classes('text-green', remove='text-red')
else:
input4_indicator.classes('text-red', remove='text-green')
input4.release()
except OSError as e:
print(f"Input4 error: {e}")
def get_input5():
try:
input5.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN)
if input5.get_value():
input5_indicator.classes('text-green', remove='text-red')
else:
input5_indicator.classes('text-red', remove='text-green')
input5.release()
except OSError as e:
print(f"Input5 error: {e}")
def get_input6():
try:
input6.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN)
if input6.get_value():
input6_indicator.classes('text-green', remove='text-red')
else:
input6_indicator.classes('text-red', remove='text-green')
input6.release()
except OSError as e:
print(f"Input6 error: {e}")
def get_input7():
try:
input7.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN)
if input7.get_value():
input7_indicator.classes('text-green', remove='text-red')
else:
input7_indicator.classes('text-red', remove='text-green')
input7.release()
except OSError as e:
print(f"Input7 error: {e}")
def get_input8():
try:
input8.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN)
if input8.get_value():
input8_indicator.classes('text-green', remove='text-red')
else:
input8_indicator.classes('text-red', remove='text-green')
input8.release()
except OSError as e:
print(f"Input8 error: {e}")
atexit.register(lambda: output1.release())
atexit.register(lambda: output2.release())
atexit.register(lambda: output3.release())
atexit.register(lambda: output4.release())
atexit.register(lambda: output5.release())
atexit.register(lambda: output6.release())
atexit.register(lambda: output7.release())
atexit.register(lambda: output8.release())
atexit.register(lambda: input1.release())
atexit.register(lambda: input2.release())
atexit.register(lambda: input3.release())
atexit.register(lambda: input4.release())
atexit.register(lambda: input5.release())
atexit.register(lambda: input6.release())
atexit.register(lambda: input7.release())
atexit.register(lambda: input8.release())
ui.markdown('#zeroPLC')
ui.markdown('##24V Outputs')
with ui.row():
switch_1 = ui.switch('Output 1', on_change=set_output1)
switch_2 = ui.switch('Output 2', on_change=set_output2)
switch_3 = ui.switch('Output 3', on_change=set_output3)
switch_4 = ui.switch('Output 4', on_change=set_output4)
switch_5 = ui.switch('Output 5', on_change=set_output5)
switch_6 = ui.switch('Output 6', on_change=set_output6)
switch_7 = ui.switch('Output 7', on_change=set_output7)
switch_8 = ui.switch('Output 8', on_change=set_output8)
ui.markdown('##24V Digital Inputs')
with ui.row():
input1_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red')
ui.label('Input 1')
input2_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red')
ui.label('Input 2')
input3_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red')
ui.label('Input 3')
input4_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red')
ui.label('Input 4')
input5_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red')
ui.label('Input 5')
input6_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red')
ui.label('Input 6')
input7_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red')
ui.label('Input 7')
input8_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red')
ui.label('Input 8')
ui.timer(0.5, get_input1)
ui.timer(0.5, get_input2)
ui.timer(0.5, get_input3)
ui.timer(0.5, get_input4)
ui.timer(0.5, get_input5)
ui.timer(0.5, get_input6)
ui.timer(0.5, get_input7)
ui.timer(0.5, get_input8)
table_rows = [
{'Input': str(i + 1), 'Voltage': 0.0} for i in range(8)
]
# Example function to update voltages
def get_analog_inputs():
value = [0] * 8
with open('/sys/bus/iio/devices/iio:device1/in_voltage0_raw', 'r') as f:
value[0]=int(f.read())*3/500
with open('/sys/bus/iio/devices/iio:device1/in_voltage1_raw', 'r') as f:
value[1]=int(f.read())*3/500
with open('/sys/bus/iio/devices/iio:device1/in_voltage2_raw', 'r') as f:
value[2]=int(f.read())*3/500
with open('/sys/bus/iio/devices/iio:device1/in_voltage3_raw', 'r') as f:
value[3]=int(f.read())*3/500
with open('/sys/bus/iio/devices/iio:device0/in_voltage0_raw', 'r') as f:
value[4]=int(f.read())*3/500
with open('/sys/bus/iio/devices/iio:device0/in_voltage1_raw', 'r') as f:
value[5]=int(f.read())*3/500
with open('/sys/bus/iio/devices/iio:device0/in_voltage2_raw', 'r') as f:
value[6]=int(f.read())*3/500
with open('/sys/bus/iio/devices/iio:device0/in_voltage3_raw', 'r') as f:
value[7]=int(f.read())*3/500
for i in range(8):
table_rows[i]['Voltage'] = value[i]
table.rows = table_rows
table.update()
ui.markdown('##10V Analog Inputs')
with ui.row():
# Create the table
table = ui.table(
columns=[
{'name': 'Input', 'label': 'Input', 'field': 'Input'},
{'name': 'Voltage', 'label': 'Voltage', 'field': 'Voltage'},
],
rows=table_rows,
)
ui.timer(0.5, get_analog_inputs)
ui.markdown('##Communication')
ui.markdown('####Onewire')
def get_onewire():
try:
with open('/sys/bus/w1/devices/28-ca795e1e64ff/temperature', 'r') as f:
value=int(f.read())/1000
onewirelabel.set_text("DS18B20 onewire sensor: %.2f °C"%value)
except:
onewirelabel.set_text("not connected")
with ui.row():
onewirelabel = ui.label()
ui.timer(1, get_onewire)
ui.markdown('####I2C')
with ui.row():
tmp117label = ui.label()
ui.timer(1, lambda: tmp117label.set_text("Onboard TMP117 temperature: %.2f °C"%tmp117.temperature))
ui.markdown('####Differential I2C')
def get_diffi2c():
if sht == 0:
sht40label.set_text("not connected")
else:
temperature, relative_humidity = sht.measurements
sht40label.set_text("SHT40 sensor: %.2f °C %0.1f %%"%(temperature, relative_humidity))
with ui.row():
sht40label = ui.label()
ui.timer(1, get_diffi2c)
app.add_static_files('/static', 'static')
ui.query("body").style(
"background-image: url('/static/zeroplc.jpg');"
"background-size: contain;"
"background-position: center;"
"background-repeat: no-repeat;"
)
def main():
try:
init_camera()
camera_connected = 1
except:
camera_connected = 0
ui.markdown('####Camera')
with ui.row().classes('w-full justify-left'):
if camera_connected != 0:
ui.html('''
<div>
<img src="/video_feed" style="width: 100%; height: auto;" />
</div>
''')
else:
ui.html('<h3>No camera connected</h3>')
ui.run(reload=False)
if __name__ == '__main__':
main()