69 lines
2.3 KiB
Python
69 lines
2.3 KiB
Python
import time
|
|
from threading import Thread
|
|
from screeninfo import get_monitors
|
|
import pyautogui
|
|
import numpy as np
|
|
import cv2 as cv
|
|
from core.Logger import Logger
|
|
|
|
FPS_REPORT_DELAY = 3
|
|
|
|
class Monitor:
|
|
def __init__(self):
|
|
self.logger = Logger("Monitor")
|
|
self.monitor = None
|
|
self.screenThread = None
|
|
self.screenshot = None
|
|
self.fps = None
|
|
self.getMonitor()
|
|
|
|
def getMonitor(self):
|
|
for monitor in get_monitors():
|
|
if (monitor.is_primary):
|
|
self.monitor = monitor
|
|
|
|
def updateScreen(self):
|
|
self.logger.log("Monitor: Starting computer vision screen update...")
|
|
self.logger.log("Monitor: Detected display resolution: " + str(self.monitor.width) + " x " + str(self.monitor.height))
|
|
|
|
loopTime = time.time()
|
|
fpsPrintTime = time.time()
|
|
while True:
|
|
newScreenshot = pyautogui.screenshot()
|
|
newScreenshot = cv.cvtColor(np.array(newScreenshot), cv.IMREAD_GRAYSCALE)
|
|
self.screenshot = newScreenshot
|
|
cv.imwrite("assets/screenshot.jpg", self.screenshot)
|
|
currTime = time.time()
|
|
if currTime - fpsPrintTime >= FPS_REPORT_DELAY:
|
|
self.fps = 1 / (currTime - loopTime)
|
|
fpsPrintTime = currTime
|
|
loopTime = currTime
|
|
|
|
def startScreenCaptureThread(self):
|
|
self.screenThread = Thread(target=self.updateScreen, name="Update screen thread", daemon=True)
|
|
self.screenThread.start()
|
|
self.logger.log("Main Agent: Thread started")
|
|
|
|
def findMatch(self, template, method = "TM_CCOEFF_NORMED"):
|
|
methodInt = getattr(cv, method)
|
|
screen = cv.imread("assets/screenshot.jpg", cv.IMREAD_GRAYSCALE)
|
|
assert template is not None, "file could not be read, check with os.path.exists()"
|
|
res = cv.matchTemplate(screen, template, methodInt)
|
|
w, h = template.shape[::-1]
|
|
min_val, max_val, min_loc, max_loc = cv.minMaxLoc(res)
|
|
|
|
# If the method is TM_SQDIFF or TM_SQDIFF_NORMED, take minimum
|
|
if method in [cv.TM_SQDIFF, cv.TM_SQDIFF_NORMED]:
|
|
top_left = min_loc
|
|
else:
|
|
top_left = max_loc
|
|
bottom_right = (top_left[0] + w, top_left[1] + h)
|
|
|
|
return (top_left[0] + bottom_right[0]) / 2, (top_left[1] + bottom_right[1]) / 2
|
|
|
|
def stopScreenCaptureThread(self):
|
|
if (self.screenThread):
|
|
self.screenThread.terminate()
|
|
self.logger.log("Main Agent: Thread terminated")
|
|
|
|
|