106 lines
3.3 KiB
Python
106 lines
3.3 KiB
Python
import time
|
|
from threading import Thread
|
|
from inspect import isfunction
|
|
from screeninfo import get_monitors
|
|
import pyautogui
|
|
import numpy as np
|
|
import cv2 as cv
|
|
from core.Logger import Logger
|
|
|
|
FPS_REPORT_DELAY = 3
|
|
|
|
class Monitor:
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.isDebug = self.config.file["debug"]
|
|
self.logger = Logger("Monitor", self.isDebug)
|
|
self.monitor = None
|
|
self.screenThread = None
|
|
self.screenshot = None
|
|
self.fps = None
|
|
self.getMonitor()
|
|
|
|
def getMonitor(self):
|
|
for monitor in get_monitors():
|
|
if (monitor.is_primary):
|
|
self.monitor = monitor
|
|
|
|
def updateScreen(self):
|
|
self.logger.log("Monitor: Starting computer vision screen update...")
|
|
self.logger.log("Monitor: Detected display resolution: " + str(self.monitor.width) + " x " + str(self.monitor.height))
|
|
|
|
loopTime = time.time()
|
|
fpsPrintTime = time.time()
|
|
while True:
|
|
newScreenshot = pyautogui.screenshot()
|
|
newScreenshot = cv.cvtColor(np.array(newScreenshot), cv.COLOR_RGB2BGR)
|
|
grayScreenshot = cv.cvtColor(newScreenshot, cv.COLOR_BGR2GRAY)
|
|
self.screenshot = grayScreenshot
|
|
if (self.isDebug is True):
|
|
cv.imwrite("assets/screenshot.jpg", self.screenshot)
|
|
currTime = time.time()
|
|
if currTime - fpsPrintTime >= FPS_REPORT_DELAY:
|
|
self.fps = 1 / (currTime - loopTime)
|
|
fpsPrintTime = currTime
|
|
loopTime = currTime
|
|
|
|
def startScreenCaptureThread(self):
|
|
self.screenThread = Thread(target=self.updateScreen, name="Update screen thread", daemon=True)
|
|
self.screenThread.start()
|
|
self.logger.log("Main Agent: Thread started")
|
|
|
|
# TODO: GET IMAGE BEST MATCH RATIO FOR A WHOLE DIRECTORY OF ASSETS
|
|
def findBestMatchFromDir(self, dirPath):
|
|
pass
|
|
|
|
def findMatch(self, template, method = "TM_CCOEFF_NORMED"):
|
|
methodInt = getattr(cv, method)
|
|
res = cv.matchTemplate(self.screenshot, template, methodInt)
|
|
w, h = template.shape[::-1]
|
|
min_val, max_val, min_loc, max_loc = cv.minMaxLoc(res)
|
|
|
|
matchRatio = max_val * 100
|
|
self.logger.debug(matchRatio)
|
|
# Consider a match if ratio is at least 50%
|
|
if (matchRatio < 50):
|
|
self.logger.debug("Cannot find matching result...")
|
|
return -1
|
|
|
|
# If the method is TM_SQDIFF or TM_SQDIFF_NORMED, take minimum
|
|
if method in [cv.TM_SQDIFF, cv.TM_SQDIFF_NORMED]:
|
|
top_left = min_loc
|
|
else:
|
|
top_left = max_loc
|
|
bottom_right = (top_left[0] + w, top_left[1] + h)
|
|
|
|
return (top_left[0] + bottom_right[0]) / 2, (top_left[1] + bottom_right[1]) / 2
|
|
|
|
def findMatchAndMoveToPosition(self, template, method = "TM_CCOEFF_NORMED"):
|
|
point = self.findMatch(template, method)
|
|
if (point == -1):
|
|
return -1
|
|
time.sleep(0.2)
|
|
pyautogui.moveTo(point[0], point[1])
|
|
time.sleep(0.15)
|
|
|
|
def findMatchAndClickIfAvailable(self, template, onFound = None, onNotFound = None):
|
|
res = self.findMatchAndMoveToPosition(template)
|
|
|
|
if (res == -1):
|
|
time.sleep(0.3)
|
|
if isfunction(onNotFound) is True:
|
|
return onNotFound(res)
|
|
else:
|
|
time.sleep(0.1)
|
|
pyautogui.click()
|
|
time.sleep(0.15)
|
|
if (isfunction(onFound) is True):
|
|
return onFound(res)
|
|
|
|
|
|
def stopScreenCaptureThread(self):
|
|
if (self.screenThread):
|
|
self.screenThread.terminate()
|
|
self.logger.log("Main Agent: Thread terminated")
|
|
|
|
|