Commit 854d2864 authored by 徐生海's avatar 徐生海

Initial commit

parent 021a993e
Pipeline #179 failed with stages
...@@ -227,7 +227,55 @@ class AppUtils: ...@@ -227,7 +227,55 @@ class AppUtils:
else: else:
return None return None
@staticmethod
def IsSocketPortAvailable(address: str, port: int) -> bool:
"""判断当前IP 和 端口是否可联接"""
try:
if not isinstance(address, str) or not isinstance(port, (int, float)):
return False
Socket: Qt.QTcpSocket = Qt.QTcpSocket()
Socket.setPeerAddress(Qt.QHostAddress(address))
Socket.setPeerPort(int(port))
IsAvailable = Socket.waitForConnected(1000)
if IsAvailable:
Socket.disconnectFromHost()
return True
else:
return False
except Exception as e:
AppUtils.print_Exception(e)
return False
@staticmethod
def IsServerAvailable(url_string: str) -> bool:
"""判断当前IP 和 端口是否可联接"""
try:
url: Qt.QUrl = Qt.QUrl(url_string)
url_authority: list[str] = url.authority().split(":")
if len(url_authority) < 2:
HostAddr = "0.0.0.0"
HostPort = 9991
else:
try:
HostAddr, HostPort = url_authority[0], AppConvert.ObjectConvert(url_authority[1], int)
except Exception as e:
HostAddr = "0.0.0.0"
HostPort = 9991
if not isinstance(HostAddr, str):
HostAddr = "0.0.0.0"
if not isinstance(HostPort, (int, float)):
HostPort = 9991
Socket: Qt.QTcpServer = Qt.QTcpServer()
IsAvailable = Socket.listen(Qt.QHostAddress(HostAddr), int(HostPort))
if IsAvailable:
Socket.close()
return True
else:
return False
except Exception as e:
AppUtils.print_Exception(e)
return False
def log_exceptions(func): def log_exceptions(func):
......
...@@ -16,45 +16,50 @@ from AppInterfaces.globalSlot import AppSlot ...@@ -16,45 +16,50 @@ from AppInterfaces.globalSlot import AppSlot
from AppInterfaces.AppDaemonController import AppDaemonController, DaemonController from AppInterfaces.AppDaemonController import AppDaemonController, DaemonController
if __name__ == "__main__": if __name__ == "__main__":
app = Qt.QApplication(sys.argv) app = Qt.QApplication(sys.argv)
AppSlot.initApplicationConnect(app) """使用异步"""
loop = QEventLoop(app) loop = QEventLoop(app)
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
DaemonController.Manager = loop DaemonController.Manager = loop
from AppSettings.AppSetting import AppSetting, AppConfig from AppCore.Contexts import *
from AppUtils.AppUtils import AppUtils
from AppUtils.AppConvert import AppConvert
from AppSettings.AppSetting import AppConfig
"""加载各模块参数"""
AppConfig.reload() AppConfig.reload()
"""判断服务端口是否可用 如果不可用 直接退出"""
if AppUtils.IsServerAvailable(AppConfig.WebConfig.API):
from AppInterfaces.bases import *
from AppUtils.DataBaseDeploy import DataBaseDeploy from AppUtils.DataBaseDeploy import DataBaseDeploy
from AppSettings.AppStatusSetting import AppStatus
from AppSettings.AppGlobalStatus import AppStatus
from AppSettings.SystemSetting import SystemConfig from AppSettings.SystemSetting import SystemConfig
from AppSettings.HitBotSetting import HitBotSetting, HitBotConfig from AppSettings.HitBotSetting import HitBotSetting, HitBotConfig
from AppBackEnd.AppEventBusBusiness import * from AppBackEnd.AppEventBusBusiness import *
from AppCore.AppDBModels.DBModels import * from AppCore.AppDBModels.DBModels import *
DataBaseDeploy.init () DataBaseDeploy.init()
AppStatus.UpdateFromDBModel()
AppStatus.UpdateMaxLocation () HitBotConfig.UpdateFromDBModel()
SystemConfig.UpdateFromDBModel()
SystemConfig.UpdateMaxLocation() TrayModel.UpdateModel()
ImageConfigModel.UpdateMaxLocation()
HitBotConfig.UpdateMaxLocation()
from AppCore.AppManagers.HardWareManager import HardWareManager from AppCore.AppManagers.HardWareManager import HardWareManager
from AppCore.Readers import ModBusPool from AppCore.Readers import ModBusPool
from AppBackEnd.AppScanningFlow import AppScanningFlow from AppBackEnd.AppScanningFlow import AppScanningFlow
from AppHeadEnd.Frames.MainWindow import MainWindow from AppHeadEnd.Frames.MainWindow import MainWindow
from AppHeadEnd.UiViews import *
AppScanningFlow.GetInstance()
AppStatus.waitForRunning() AppStatus.waitForRunning()
widget: MainWindow = MainWindow() widget: MainWindow = MainWindow()
widget.show() widget.show()
ModBusPool.GetInstance().ReOpen() ModBusPool.GetInstance().ReOpen()
HardWareManager.GetInstance().initDevices() HardWareManager.GetInstance().initDevices()
AppScanningFlow.GetInstance().initModules() AppScanningFlow.GetInstance().initModules()
print('扫描系统启动成功') print('扫描系统启动成功')
DaemonController.Start(loop) DaemonController.Start(loop)
sys.exit(app.exec()) sys.exit(app.exec())
else:
print("端口被占用,请检查!")
sys.exit()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment