#!/usr/bin/env bash '''exec' "$HOME/zeroPLCgui/.venv/bin/python3" "$0" "$@" ' ''' from nicegui import ui,app import os,sys import gpiod import atexit import board import adafruit_tmp117 import adafruit_sht4x from adafruit_extended_bus import ExtendedI2C as I2C from picamera2 import Picamera2 from fastapi.responses import StreamingResponse import io from PIL import Image import time picam2 = None camera_connected = 0 def init_camera(): global picam2 picam2 = Picamera2() picam2.configure(picam2.create_preview_configuration(main={"format": 'XRGB8888', "size": (640, 480)})) picam2.start() def generate_mjpeg(): while True: img = picam2.capture_array() pil_img = Image.fromarray(img) if pil_img.mode == 'RGBA': pil_img = pil_img.convert('RGB') buf = io.BytesIO() pil_img.save(buf, format='JPEG') frame = buf.getvalue() yield ( b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n' ) time.sleep(0.1) @app.get('/video_feed') def video_feed(): return StreamingResponse(generate_mjpeg(), media_type='multipart/x-mixed-replace; boundary=frame') i2c = board.I2C() # uses board.SCL and board.SDA tmp117 = adafruit_tmp117.TMP117(i2c) i2c3=I2C(3) try: sht = adafruit_sht4x.SHT4x(i2c3,0x46) except: sht = 0 # Do not request the line upfront; request and release on demand chip = gpiod.Chip('gpiochip0') chip2 = gpiod.Chip('gpiochip2') output1 = chip.get_line(12) output2 = chip.get_line(13) output3 = chip.get_line(18) output4 = chip.get_line(19) output5 = chip.get_line(20) output6 = chip.get_line(21) output7 = chip.get_line(22) output8 = chip.get_line(26) input1 = chip.get_line(5) input2 = chip.get_line(6) input3 = chip.get_line(7) input4 = chip.get_line(8) input5 = chip.get_line(27) input6 = chip.get_line(25) input7 = chip2.get_line(9) input8 = chip2.get_line(10) def set_output1(): try: output1.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT) output1.set_value(switch_1.value) output1.release() except OSError as e: print(f"Output1 error: {e}") def set_output2(): try: output2.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT) output2.set_value(switch_2.value) output2.release() except OSError as e: print(f"Output2 error: {e}") def set_output3(): try: output3.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT) output3.set_value(switch_3.value) output3.release() except OSError as e: print(f"Output3 error: {e}") def set_output4(): try: output4.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT) output4.set_value(switch_4.value) output4.release() except OSError as e: print(f"Output4 error: {e}") def set_output5(): try: output5.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT) output5.set_value(switch_5.value) output5.release() except OSError as e: print(f"Output5 error: {e}") def set_output6(): try: output6.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT) output6.set_value(switch_6.value) output6.release() except OSError as e: print(f"Output6 error: {e}") def set_output7(): try: output7.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT) output7.set_value(switch_7.value) output7.release() except OSError as e: print(f"Output7 error: {e}") def set_output8(): try: output8.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_OUT) output8.set_value(switch_8.value) output8.release() except OSError as e: print(f"Output8 error: {e}") def get_input1(): try: input1.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN) if input1.get_value(): input1_indicator.classes('text-green', remove='text-red') else: input1_indicator.classes('text-red', remove='text-green') input1.release() except OSError as e: print(f"Input1 error: {e}") def get_input2(): try: input2.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN) if input2.get_value(): input2_indicator.classes('text-green', remove='text-red') else: input2_indicator.classes('text-red', remove='text-green') input2.release() except OSError as e: print(f"Input2 error: {e}") def get_input3(): try: input3.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN) if input3.get_value(): input3_indicator.classes('text-green', remove='text-red') else: input3_indicator.classes('text-red', remove='text-green') input3.release() except OSError as e: print(f"Input3 error: {e}") def get_input4(): try: input4.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN) if input4.get_value(): input4_indicator.classes('text-green', remove='text-red') else: input4_indicator.classes('text-red', remove='text-green') input4.release() except OSError as e: print(f"Input4 error: {e}") def get_input5(): try: input5.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN) if input5.get_value(): input5_indicator.classes('text-green', remove='text-red') else: input5_indicator.classes('text-red', remove='text-green') input5.release() except OSError as e: print(f"Input5 error: {e}") def get_input6(): try: input6.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN) if input6.get_value(): input6_indicator.classes('text-green', remove='text-red') else: input6_indicator.classes('text-red', remove='text-green') input6.release() except OSError as e: print(f"Input6 error: {e}") def get_input7(): try: input7.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN) if input7.get_value(): input7_indicator.classes('text-green', remove='text-red') else: input7_indicator.classes('text-red', remove='text-green') input7.release() except OSError as e: print(f"Input7 error: {e}") def get_input8(): try: input8.request(consumer='nicegui', type=gpiod.LINE_REQ_DIR_IN) if input8.get_value(): input8_indicator.classes('text-green', remove='text-red') else: input8_indicator.classes('text-red', remove='text-green') input8.release() except OSError as e: print(f"Input8 error: {e}") atexit.register(lambda: output1.release()) atexit.register(lambda: output2.release()) atexit.register(lambda: output3.release()) atexit.register(lambda: output4.release()) atexit.register(lambda: output5.release()) atexit.register(lambda: output6.release()) atexit.register(lambda: output7.release()) atexit.register(lambda: output8.release()) atexit.register(lambda: input1.release()) atexit.register(lambda: input2.release()) atexit.register(lambda: input3.release()) atexit.register(lambda: input4.release()) atexit.register(lambda: input5.release()) atexit.register(lambda: input6.release()) atexit.register(lambda: input7.release()) atexit.register(lambda: input8.release()) ui.markdown('#zeroPLC') ui.markdown('##24V Outputs') with ui.row(): switch_1 = ui.switch('Output 1', on_change=set_output1) switch_2 = ui.switch('Output 2', on_change=set_output2) switch_3 = ui.switch('Output 3', on_change=set_output3) switch_4 = ui.switch('Output 4', on_change=set_output4) switch_5 = ui.switch('Output 5', on_change=set_output5) switch_6 = ui.switch('Output 6', on_change=set_output6) switch_7 = ui.switch('Output 7', on_change=set_output7) switch_8 = ui.switch('Output 8', on_change=set_output8) ui.markdown('##24V Digital Inputs') with ui.row(): input1_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red') ui.label('Input 1') input2_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red') ui.label('Input 2') input3_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red') ui.label('Input 3') input4_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red') ui.label('Input 4') input5_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red') ui.label('Input 5') input6_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red') ui.label('Input 6') input7_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red') ui.label('Input 7') input8_indicator = ui.icon('fiber_manual_record', size='2em').classes('drop-shadow text-red') ui.label('Input 8') ui.timer(0.5, get_input1) ui.timer(0.5, get_input2) ui.timer(0.5, get_input3) ui.timer(0.5, get_input4) ui.timer(0.5, get_input5) ui.timer(0.5, get_input6) ui.timer(0.5, get_input7) ui.timer(0.5, get_input8) table_rows = [ {'Input': str(i + 1), 'Voltage': 0.0} for i in range(8) ] # Example function to update voltages def get_analog_inputs(): value = [0] * 8 with open('/sys/bus/iio/devices/iio:device1/in_voltage0_raw', 'r') as f: value[0]=int(f.read())*3/500 with open('/sys/bus/iio/devices/iio:device1/in_voltage1_raw', 'r') as f: value[1]=int(f.read())*3/500 with open('/sys/bus/iio/devices/iio:device1/in_voltage2_raw', 'r') as f: value[2]=int(f.read())*3/500 with open('/sys/bus/iio/devices/iio:device1/in_voltage3_raw', 'r') as f: value[3]=int(f.read())*3/500 with open('/sys/bus/iio/devices/iio:device0/in_voltage0_raw', 'r') as f: value[4]=int(f.read())*3/500 with open('/sys/bus/iio/devices/iio:device0/in_voltage1_raw', 'r') as f: value[5]=int(f.read())*3/500 with open('/sys/bus/iio/devices/iio:device0/in_voltage2_raw', 'r') as f: value[6]=int(f.read())*3/500 with open('/sys/bus/iio/devices/iio:device0/in_voltage3_raw', 'r') as f: value[7]=int(f.read())*3/500 for i in range(8): table_rows[i]['Voltage'] = value[i] table.rows = table_rows table.update() ui.markdown('##10V Analog Inputs') with ui.row(): # Create the table table = ui.table( columns=[ {'name': 'Input', 'label': 'Input', 'field': 'Input'}, {'name': 'Voltage', 'label': 'Voltage', 'field': 'Voltage'}, ], rows=table_rows, ) ui.timer(0.5, get_analog_inputs) ui.markdown('##Communication') ui.markdown('####Onewire') def get_onewire(): try: with open('/sys/bus/w1/devices/28-ca795e1e64ff/temperature', 'r') as f: value=int(f.read())/1000 onewirelabel.set_text("DS18B20 onewire sensor: %.2f °C"%value) except: onewirelabel.set_text("not connected") with ui.row(): onewirelabel = ui.label() ui.timer(1, get_onewire) ui.markdown('####I2C') with ui.row(): tmp117label = ui.label() ui.timer(1, lambda: tmp117label.set_text("Onboard TMP117 temperature: %.2f °C"%tmp117.temperature)) ui.markdown('####Differential I2C') def get_diffi2c(): if sht == 0: sht40label.set_text("not connected") else: temperature, relative_humidity = sht.measurements sht40label.set_text("SHT40 sensor: %.2f °C %0.1f %%"%(temperature, relative_humidity)) with ui.row(): sht40label = ui.label() ui.timer(1, get_diffi2c) app.add_static_files('/static', 'static') ui.query("body").style( "background-image: url('/static/zeroplc.jpg');" "background-size: contain;" "background-position: center;" "background-repeat: no-repeat;" ) def main(): try: init_camera() camera_connected = 1 except: camera_connected = 0 ui.markdown('####Camera') with ui.row().classes('w-full justify-left'): if camera_connected != 0: ui.html('''