思路:
1.利用萤石云监控,放置摄像头在小孩学习桌前,监控小孩是否在学习桌前学习;
2.利用Mumu模拟器,通过对萤石app的截图,每10秒采集孩子的学习照片;
3.通过Pytorch深度学习,对采集的图片进行训练、测试,判断是否孩子在学习桌前;
4.通过Python操作Mumu模拟器,对孩子不在学习桌前,点击App的响铃按钮,实施对小孩的监控,并且用Sqlite3数据库记录小孩的学习记录。
采集照片的代码:
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 22 11:01:23 2024
@author: YBK
"""
import os
import random
import time
import cv2
import numpy as np
import win32api
from apscheduler.schedulers.blocking import BlockingScheduler
from skimage.metrics import structural_similarity as ssim
#连接mumu模拟器 首先先断开服务再执行连接
def Connection():
cmd = 'adb kill-server'
tmp = os.popen(cmd).readlines()
cmd = 'adb connect 127.0.0.1:16384'
tmp = os.popen(cmd).readlines()
print(tmp)
#模拟点击 这个是代码的核心 也是繁琐的根源
def Click(x,y):
cmd = 'adb shell input tap {x1} {y1}'.format(x1 = x,y1 = y)
print(cmd)
os.system(cmd)
#通过adb进行截图
def Screenshot(jpgname):
if jpgname is None:
jpgname = nowtime_to_str()
os.system(f'adb shell screencap -p /sdcard/{jpgname}.png')
os.system(f'adb pull /sdcard/{jpgname}.png e:/sb/{jpgname}.png')
# 读取图片
image = cv2.imread(f'e:/sb/{jpgname}.png', cv2.IMREAD_UNCHANGED)
# 设定剪裁区域的坐标和尺寸
x, y, w, h = 150, 0, 690, 540 # 示例坐标和尺寸
# 使用numpy数组进行剪裁
cropped_image = image[y:y+h, x:x+w]
# 保存剪裁后的图片
cv2.imwrite(f'e:/sb/{jpgname}.jpg', cropped_image, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
file_path = f'e:/sb/{jpgname}.png'
if os.path.exists(file_path): # 检查文件是否存在
os.remove(file_path) # 删除文件
else:
print(f"文件 {file_path} 不存在")
# 读取图片
imageA = cv2.imread(f'e:/sb/{jpgname}.jpg', cv2.IMREAD_GRAYSCALE) # 图片路径替换为你的图片路径
imageB = cv2.imread(r'E:\sb\chick.jpg', cv2.IMREAD_GRAYSCALE) # 图片路径替换为你的图片路径
imageC = cv2.imread(r'E:\sb\chick0.jpg', cv2.IMREAD_GRAYSCALE)
# 确保图片大小相同
imageA0 = cv2.resize(imageA, (imageB.shape[1], imageB.shape[0]))
imageA1 = cv2.resize(imageA, (imageC.shape[1], imageC.shape[0]))
# 计算SSIM
(score, diff) = ssim(imageA0, imageB, full=True)
diff = (diff * 255).astype("uint8")
(score1, diff) = ssim(imageA1, imageC, full=True)
print(f"SSIM: {score} , {score1}")
if score > 0.98:
Click(352, 308)
time.sleep(1)
if score1 > 0.98:
Click(452, 293)
time.sleep(1)
os.system(f'adb shell rm /sdcard/{jpgname}.png')
#Click(446, 867) #响铃
#自动启动mumu模拟器
def Lon():
win32api.ShellExecute(0, 'open', r'D:\Program Files\Netease\MuMuPlayer-12.0\shell\MuMuPlayer.exe', '', '', 1)
time.sleep(25)
Connection()
Click(1349,674)
time.sleep(5)
time.sleep(10) #程序有广告再等待
Click(400,400)
time.sleep(3)
def dec_to_36(num):
base = [str(x) for x in range(10)] + [chr(x) for x in range(ord('A'),ord("A")+26)]
# 前者把 0 ~ 9 转换成字符串存进列表 base 里,后者把 A ~ Z 存进列表
l = []
if num<0:
return "-"+dec_to_36(abs(num))
while True:
num,rem = divmod(num,36) # 求商 和 留余数
l.append(base[rem])
if num == 0:
return "".join(l[::-1])
def nowtime_to_str():
#将当前时间戳转化为36进制,约6位字符,减少文件名长度
unix_timestamp = int(time.time())
return(dec_to_36(unix_timestamp))
Lon() #打开模拟器打开程序
Connection()#首先连接上mumu
# time.sleep(10) #程序有广告再等待
# Click(400,400)
# time.sleep(3)
sched = BlockingScheduler()
sched.add_job(Screenshot, args=[None,], trigger='cron', day_of_week='mon-sun', hour='*', minute='*', second='*/30', )
sched.start()
图片进行训练的代码,我用GPU:
# -*- coding: utf-8 -*-
"""
Created on Tue Dec 24 09:02:10 2024
@author: YBK
"""
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms, models
from torch.utils.data import DataLoader
# 检查是否有可用的GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 数据预处理
transform = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 加载数据集
train_dataset = datasets.ImageFolder(root=r'E:\.spyder-py3\jjxjwl\haveman0', transform=transform)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_dataset = datasets.ImageFolder(root=r'E:\.spyder-py3\jjxjwl\haveman00', transform=transform)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
# 选择预训练模型并修改分类头
model = models.resnet18(pretrained=True)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2) # 2表示二分类
# 将模型移动到GPU
model = model.to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练模型
num_epochs = 50
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for inputs, labels in train_loader:
# 将数据和标签移动到GPU
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_loader)}')
# 验证模型
model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()
print(f'Validation Accuracy: {100 * correct / total}%')
# 保存模型
torch.save(model.state_dict(), 'model.pth')
文件目录:
haveman00我只保存10张有人和无人的照片。
模型测试的代码:
# -*- coding: utf-8 -*-
"""
Created on Tue Dec 24 15:37:49 2024
@author: YBK
"""
import torch
import torch.nn as nn
from torchvision import transforms, models
from PIL import Image
# 加载模型
model = models.resnet18()
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2) # 确保分类头与训练时相匹配
model.load_state_dict(torch.load('model.pth'))
model.eval() # 设置为评估模式
# 如果模型是在GPU上训练的,确保在推理时也使用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 定义预处理操作
preprocess = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
# 加载并预处理图片
img_path = r'E:\sb\man\SOVSBI.jpg'
# img_path = r'E:\sb\noman\SOVR96.jpg'
# img_path = r'E:\sxtpz\OLD\20240820\0820100504.jpg'
image = Image.open(img_path).convert('RGB')
image_tensor = preprocess(image).unsqueeze(0) # 添加一个batch维度
image_tensor = image_tensor.to(device)
# 执行推理
with torch.no_grad():
outputs = model(image_tensor)
_, predicted = torch.max(outputs, 1)
# 解释预测结果
if predicted.item() == 0:
print("图片中有人")
else:
print("图片中没有人")
训练大概300张有人和无人的照片后,启用监控的程序,其中使用了番茄时间,即学习25分钟休息5分钟:
# -*- coding: utf-8 -*-
"""
Created on Sun Dec 22 11:01:23 2024
@author: YBK
"""
import os
import time
import cv2
import win32api
from apscheduler.schedulers.blocking import BlockingScheduler
from skimage.metrics import structural_similarity as ssim
import sqlite3
import datetime
from pathlib import Path
import torch
import torch.nn as nn
from torchvision import transforms, models
from PIL import Image
# 加载模型
model = models.resnet18()
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2) # 确保分类头与训练时相匹配
model.load_state_dict(torch.load('model.pth'))
model.eval() # 设置为评估模式
# 如果模型是在GPU上训练的,确保在推理时也使用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 定义预处理操作
preprocess = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
db_filepath = Path(__file__).joinpath("../jk.db").resolve()
def insertdb(sj,furl,xl,zt): #插入一行数据zt1为有人0为无人
conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)
c = conn.cursor()
insert_query = "INSERT INTO jk(sj,furl,xl,zt) VALUES(?,?,?,?);"
insert_data = (sj,furl,xl,zt)
c.execute(insert_query,insert_data)
conn.commit()
c.close()
conn.close
def predict(image_path):
# 加载并预处理图片
img_path = image_path
image = Image.open(img_path).convert('RGB')
image_tensor = preprocess(image).unsqueeze(0) # 添加一个batch维度
image_tensor = image_tensor.to(device)
# 执行推理
with torch.no_grad():
outputs = model(image_tensor)
_, predicted = torch.max(outputs, 1)
# 解释预测结果
if predicted.item() == 0:
print("图片中有人")
result = '有人'
else:
print("图片中没有人")
result = '无人'
return result
#连接mumu模拟器 首先先断开服务再执行连接
def Connection():
cmd = 'adb kill-server'
tmp = os.popen(cmd).readlines()
cmd = 'adb connect 127.0.0.1:16384'
tmp = os.popen(cmd).readlines()
print(tmp)
#模拟点击 这个是代码的核心 也是繁琐的根源
def Click(x,y):
cmd = 'adb shell input tap {x1} {y1}'.format(x1 = x,y1 = y)
print(cmd)
os.system(cmd)
def getprezt(): #获取最后一个zt
conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)
c = conn.cursor()
cursor = c.execute("select zt from jk order by id desc limit 0,1;")
row = cursor.fetchone()
if row:
zt = row[0]
else:
zt = 1
c.close()
conn.close
return zt
def getprexl(): #获取最后一个zt
conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)
c = conn.cursor()
cursor = c.execute("select xl from jk order by id desc limit 0,1;")
row = cursor.fetchone()
if row:
xl = row[0]
else:
xl = 0
c.close()
conn.close
return xl
def isling(): #是否在响铃,因为摄像头响铃时间是1分钟,在1分钟内人如果有回来可以按掉
conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)
c = conn.cursor()
cursor = c.execute("select sj from jk where xl >= 1 order by id desc limit 0,1;")
row = cursor.fetchone()
xltime = row[0] #获取最后响铃时间
# 如果“最后响铃时间”距离现在大于1分钟,那么就没有在响铃
now = datetime.datetime.now()
nowtime = now.strftime("%Y-%m-%d %H:%M:%S")
delta = datetime.datetime.strptime(nowtime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(xltime, "%Y-%m-%d %H:%M:%S")
seconds_diff = delta.total_seconds()
if seconds_diff > 60:
isling = False
print(f"“最后响铃时间{xltime}”距离现在大于1分钟")
else:
#如果“最后响铃时间”距离现在小于1分钟,那么获取真正的响铃时间
cursor = c.execute("select sj from jk where xl = 0 and sj < datetime('" + xltime + "') order by id desc limit 0,1;")
row = cursor.fetchone()
realtime = row[0]
delta = datetime.datetime.strptime(nowtime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(realtime, "%Y-%m-%d %H:%M:%S")
seconds_diff = delta.total_seconds()
if seconds_diff <= 60:
isling = True #如果真正响铃时间距离现在也小于1分钟,那么就是真正在响铃
else:
isling = False
print(f"真正响铃时间{realtime}距离现在大于1分钟")
# 这里还要分析无人超过1分钟,但要求响铃的情况
c.close()
conn.close
return isling
def nomanmin():#无人情况经过多少个10秒
nomanonemin = 0
conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)
c = conn.cursor()
cursor = c.execute("SELECT count(*) FROM jk WHERE xl = 1 and sj > datetime('now', 'localtime', '-1 minute')")
row = cursor.fetchone()
tj = row[0]
print(f'无人统计={tj}')
c.close()
conn.close
nomanonemin = tj
return nomanonemin
def getmiaocha(): #计算秒差
seconds_diff = 0
now = datetime.datetime.now()
nowtime = now.strftime("%Y-%m-%d %H:%M:%S")
conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)
c = conn.cursor()
cursor = c.execute("select sj from jk where xl = 1 order by id desc limit 0,1;") #获取最后响铃时间
row = cursor.fetchone()
if row:
xltime = row[0]
c.close()
conn.close
if xltime:
delta = datetime.datetime.strptime(nowtime, "%Y-%m-%d %H:%M:%S") - datetime.datetime.strptime(xltime, "%Y-%m-%d %H:%M:%S")
seconds_diff = delta.total_seconds()
return seconds_diff
def xxms(): #休息时间经过多少个10秒
xxms = 0
conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)
c = conn.cursor()
cursor = c.execute("SELECT count(*) FROM jk WHERE xl = 2 and zt = 2 and sj > datetime('now', 'localtime', '-5 minute')")
row = cursor.fetchone()
tj = row[0]
print(f'休息统计={tj}')
c.close()
conn.close
xxms = tj
return xxms
def fqms(): #休息时间经过多少个10秒
fqms = 0
conn = sqlite3.connect(db_filepath, timeout=10, check_same_thread=False)
c = conn.cursor()
cursor = c.execute("SELECT count(*) FROM jk WHERE zt = 1 and sj > datetime('now', 'localtime', '-25 minute')")
row = cursor.fetchone()
tj = row[0]
print(f'学习统计={tj}')
c.close()
conn.close
fqms = tj
return fqms
#通过adb进行截图 让后面图片的匹配有基准
def Screenshot(jpgname):
if jpgname is None:
jpgname = nowtime_to_str()
os.system(f'adb shell screencap -p /sdcard/{jpgname}.png')
os.system(f'adb pull /sdcard/{jpgname}.png e:/sb/{jpgname}.png')
# 读取图片
image = cv2.imread(f'e:/sb/{jpgname}.png', cv2.IMREAD_UNCHANGED)
# 设定剪裁区域的坐标和尺寸
x, y, w, h = 150, 0, 690, 540 # 示例坐标和尺寸
# 使用numpy数组进行剪裁
cropped_image = image[y:y+h, x:x+w]
# 保存剪裁后的图片
cv2.imwrite(f'e:/sb/{jpgname}.jpg', cropped_image, [int(cv2.IMWRITE_JPEG_QUALITY), 90])
file_path = f'e:/sb/{jpgname}.png'
if os.path.exists(file_path): # 检查文件是否存在
os.remove(file_path) # 删除文件
else:
print(f"文件 {file_path} 不存在")
# 读取图片
imageA = cv2.imread(f'e:/sb/{jpgname}.jpg', cv2.IMREAD_GRAYSCALE) # 图片路径替换为你的图片路径
imageB = cv2.imread(r'E:\sb\chick.jpg', cv2.IMREAD_GRAYSCALE) # 图片路径替换为你的图片路径
imageC = cv2.imread(r'E:\sb\chick0.jpg', cv2.IMREAD_GRAYSCALE)
# 确保图片大小相同
imageA0 = cv2.resize(imageA, (imageB.shape[1], imageB.shape[0]))
imageA1 = cv2.resize(imageA, (imageC.shape[1], imageC.shape[0]))
# 计算SSIM
(score, diff) = ssim(imageA0, imageB, full=True)
diff = (diff * 255).astype("uint8")
(score1, diff) = ssim(imageA1, imageC, full=True)
print(f"SSIM: {score} , {score1}")
if score > 0.98:
Click(352, 308)
time.sleep(1)
if score1 > 0.94:
Click(452, 293)
time.sleep(1)
os.system(f'adb shell rm /sdcard/{jpgname}.png')
image_path = f'e:/sb/{jpgname}.jpg'
if score > 0.98 or score1 > 0.94:
print('点击激活监控')
else:
#先休息模式
xxmstj = xxms()
if xxmstj == 30:
#休息5分钟到,响铃回来
# Click(446, 867)
print('响铃回来')
xl = 0
zt = 0
elif xxmstj > 0 and xxmstj <= 29:
#继续休息
print('继续休息')
xl = 2
zt = 2
else:
#判断是否可以休息模式
fqmstj = fqms()
if fqmstj > 130:
# Click(446, 867)
print('响铃休息')
xl = 2
zt = 2
else:
#正常学习模式
prexl = getprexl()
nomanonemin = nomanmin()
xl = 0
zt = 0
is00 = isling()
if predict(image_path) == '有人':
zt = 1
xl = 0 #有人不响铃
if is00 and prexl == 1: #如果在响铃,则取消响铃
# Click(446, 867) #再点一次取消响铃
print('取消响铃')
else:
zt = 0
xl = 1 #无人就响铃
if (is00 != True and prexl == 0) and nomanonemin < 1:
#如果上次没有第一次响铃,或者无人超过1分钟(间距1分钟内有5个响铃),就响铃
# Click(446, 867) #再点一次取消响铃
print('点击响铃')
elif is00 != True and nomanonemin == 5:
# Click(446, 867) #再点一次取消响铃
print('无人再次点击响铃')
else:
print('在响铃了,不点击')
if nomanonemin == 5:
xl = 0 #无人超过1分钟时记录响铃为0,伪造记录用于确保程序运行
#正常学习模式结束
now = datetime.datetime.now()
insertdb(now.strftime("%Y-%m-%d %H:%M:%S"),f'e:/sb/{jpgname}.jpg',xl,zt)
#Click(446, 867) #响铃
#自动启动mumu模拟器 这是我的电脑上mumu 的位置
def Lon():
win32api.ShellExecute(0, 'open', r'D:\Program Files\Netease\MuMuPlayer-12.0\shell\MuMuPlayer.exe', '', '', 1)
time.sleep(25)
Connection()
Click(1349,674)
time.sleep(5)
def dec_to_36(num):
base = [str(x) for x in range(10)] + [chr(x) for x in range(ord('A'),ord("A")+26)]
# 前者把 0 ~ 9 转换成字符串存进列表 base 里,后者把 A ~ Z 存进列表
l = []
if num<0:
return "-"+dec_to_36(abs(num))
while True:
num,rem = divmod(num,36) # 求商 和 留余数
l.append(base[rem])
if num == 0:
return "".join(l[::-1])
def nowtime_to_str():
#将当前时间戳转化为36进制,约6位字符,减少文件名长度
unix_timestamp = int(time.time())
return(dec_to_36(unix_timestamp))
# Lon() #打开模拟器打开程序
Connection() #首先连接上mumu
# time.sleep(10) #程序有广告再等待
# Click(400,400)
# time.sleep(3)
# Screenshot('4')
sched = BlockingScheduler()
sched.add_job(Screenshot, args=[None,], trigger='cron', day_of_week='mon-sun', hour='*', minute='*', second='*/10', )
sched.start()
chick图片是网络问题吧:
chick0: