Initial commit

This commit is contained in:
ccppi 2024-06-13 11:11:58 +02:00
commit 0b1d451e2b
15 changed files with 987 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
*.venv*
*.hide
output/
*.db
*.csv
__pycache__

BIN
db/Cantons.db Normal file

Binary file not shown.

1
example.txt Normal file
View File

@ -0,0 +1 @@
python main.py --login EMAIL PW https://www.jobagent.ch/search?terms=Informatiker-Jobs&provinces=AG%2CSO&workload=40-60

55
lib/conf Normal file
View File

@ -0,0 +1,55 @@
[jobagent.ch]
USER = test@gmx.ch
PW = ASK
LOGINURL = https://www.jobagent.ch/user/login
SCRAPURL = https://www.jobagent.ch/search?terms=Automatiker&lra=0&as=0
TAG = Automatiker
[software-job.ch-application-engineer]
USER = NONE
PW = NONE
LOGINURL = NONE
SCRAPURL = https://software-job.ch/application-engineer
TAG = Informatiker
[software-job.ch]
USER = NONE
PW = NONE
LOGINURL = NONE
SCRAPURL = https://software-job.ch/python-entwicklung
TAG = Informatiker,Python
[jobs.ch_linux]
USER = NONE
PW = NONE
LOGINURL = NONE
SCRAPURL = https://www.jobs.ch/en/vacancies/?term=linux
TAG = Informatiker,Linux
[jobagent.ch-2]
USER = test@gmx.ch
PW = ASK
LOGINURL = https://www.jobagent.ch/user/login
SCRAPURL = https://www.jobagent.ch/search?terms=Informatiker&lra=0&as=0
TAG = Informatiker
[jobs.ch]
USER = NONE
PW = NONE
LOGINURL = NONE
SCRAPURL= https://www.jobs.ch/en/vacancies/?term=automatiker
TAG = Automatiker
[jobs.ch_informatiker]
USER = NONE
PW = NONE
LOGINURL = NONE
SCRAPURL= https://www.jobs.ch/en/vacancies/?term=informatiker
TAG = Informatiker
#https://www.jobagent.ch/search?terms=Automatiker&workload=60-100&lra=0&as=0

56
lib/config.py Normal file
View File

@ -0,0 +1,56 @@
import time
import configparser
from PySide6.QtWidgets import QDialog,QPushButton, QLineEdit,QVBoxLayout, QLabel
class Entry:
user=0
pw=0
loginurl=0
scrapurl=0
tag=0
def __str__(self):
return "values from Entry: %s %s PW %s %s" %(self.tag,self.user,self.loginurl,self.scrapurl)
def input_pw(self,gui,message,worker):
self.gui=gui
if not self.gui:
self.pw = input("Enter your Password")
if self.gui:
worker.messageContent = self.scrapurl
worker.dialog_closed=False
worker.pwprompt.emit() #signal to mainthread
while not worker.dialog_closed:
time.sleep(1)
pass
self.pw = worker.password
def readConfig(file,gui,worker):
if not hasattr(readConfig,"counter"):
readConfig.counter = -1
print(readConfig.counter)
entry = Entry()
config = configparser.RawConfigParser()
buffer = config.read(file)
print("buffer:",buffer)
sections = config.sections()
if(readConfig.counter < (len(sections)-1)):
readConfig.counter += 1
else:
readConfig.counter = -1
return 0
entry.user = config[sections[readConfig.counter]]["USER"]
entry.pw = config[sections[readConfig.counter]]["PW"]
entry.scrapurl = config[sections[readConfig.counter]]["SCRAPURL"]
entry.tag = config[sections[readConfig.counter]]["TAG"]
if(entry.user != 0):
if(entry.pw == "ASK"):
entry.input_pw(gui,entry.user,worker)
entry.loginurl = config[sections[readConfig.counter]]["LOGINURL"]
print(entry)
return entry

14
lib/dateconverter.py Normal file
View File

@ -0,0 +1,14 @@
def DateCHToUS(date):
#01.02.2010 --> 2010-02-01
day=""
month=""
year=""
for i in range(0,1+1):
day+= date[i]
for i in range(3,4+1):
month+=date[i]
for i in range(6,9+1):
year+=date[i];
newdate = year+"-"+month+"-"+day
return(newdate)

78
lib/db.py Normal file
View File

@ -0,0 +1,78 @@
import sqlite3
import mmh3
import sys
DEBUG = False
def log(*s):
if DEBUG:
print(s)
def initdb(file):
with sqlite3.connect(file) as connection:
print("db connection", connection.total_changes)
cursor = connection.cursor()
cursor.execute("CREATE TABLE jobs (star TEXT,tag INT ,title TEXT, location TEXT, company TEXT,link TEXT,pubdate TEXT,hash INT)")
sys.exit()
def rmdb(file,table):
with sqlite3.connect(file) as connection:
question = input("Do you really wont to empty the db(press Y)?")
if(question == "Y"):
cursor = connection.cursor()
drop_cmd = f"""DROP TABLE {table}"""
cursor.execute(drop_cmd)
else:
print("abroting removing table")
sys.exit()
def importdb(file,importdb,table):
with sqlite3.connect(file) as connection:
print("db connection",connection.total_changes)
cmd = f"""ATTACH "{importdb}" AS regions"""
cmd2 = f"""CREATE TABLE IF NOT EXISTS {table} AS SELECT * from regions.{table}"""
cmd_view = f"""
CREATE VIEW Canoton_Filter
AS
SELECT * FROM jobs as b
WHERE EXISTS
(SELECT GDENAME FROM {table} as w
where w.GDEKT = 'ZH' AND
b.location LIKE GDENAME);"""
cursor = connection.cursor()
cursor.execute(cmd)
print(cmd,cmd2)
cursor.execute(cmd2)
cursor.execute(cmd_view)
print("db connection",connection.total_changes)
def createnwview(file):
with sqlite3.connect(file) as connection:
cmd_create_nw_table = f"""CREATE VIEW "Nordwest-SCHWEIZ" AS SELECT * FROM jobs as b
WHERE EXISTS
(SELECT GDENAME FROM Cantons as w
where w.GDEKT = 'ZH' AND
b.location LIKE GDENAME)
OR EXISTS
(SELECT GDENAME FROM Cantons as w
where w.GDEKT = 'AG' AND
b.location LIKE GDENAME)
OR EXISTS
(SELECT GDENAME FROM Cantons as w
where w.GDEKT = 'SO' AND
b.location LIKE GDENAME)"""
cursor = connection.cursor()
cursor.execute(cmd_create_nw_table)
print("db connection",connection.total_changes)
def writedb(jobs):
with sqlite3.connect("../db/sqlite3.db") as connection:
print("db connection", connection.total_changes)
cursor = connection.cursor()
# cursor.execute("CREATE TABLE jobs (title TEXT, location TEXT, company TEXT,link TEXT,hash INT)")
for i3,job in enumerate(jobs):
hash1 = mmh3.hash(job.title+job.company+job.location+job.date)
log(hash1);
if(cursor.execute("SELECT * FROM jobs WHERE hash = ?",(hash1,)).fetchone() != None):
log("Hash already exist")
else:
print("NEW_ENTRY")
cursor.execute("INSERT INTO jobs (star,tag,title,company,location,link,pubdate,hash) VALUES (?,?,?,?,?,?,?,?)",(job.starred,job.tag,job.title,job.company,job.location,job.link,job.date,hash1))

297
lib/gui.py Normal file
View File

@ -0,0 +1,297 @@
from PySide6.QtWidgets import QApplication, QWidget, QMainWindow, QTableWidget, QVBoxLayout, QTableWidgetItem, QPushButton, QHBoxLayout, QTableView, QLineEdit, QDialog, QLabel, QTextEdit, QCheckBox, QComboBox
from PySide6.QtWebEngineWidgets import QWebEngineView
from PySide6.QtCore import QUrl,Qt,QSortFilterProxyModel, qDebug, QSize,QObject,QThread,Signal
from PySide6.QtSql import QSqlDatabase, QSqlTableModel, QSqlQueryModel, QSqlQuery
import sysparse
import sys
Cantons = ["AG","ZH","BE","SG","SO"]
class Worker(QObject):
pwprompt = Signal()
pw = Signal(str)
finished = Signal()
dialog_closed = True
password = ['empty']
def run(self):
sysparse.parse(config="conf",worker=self)
def return_pw(self,x):
self.password = [x]
self.dialog_closed = True
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.w = None
self.cmdCanton = ''
self.initcmd = 'SELECT * FROM jobs as b'
self.customcmd = ''
self.cmd = ''
self.setWindowTitle("DB_Inspector")
self.layout = QVBoxLayout()
self.layout2 = QHBoxLayout()
self.b_canton = QPushButton("Modify Filter")
self.b_canton.clicked.connect(self.showQueryWindow)
self.browser = QWebEngineView()
self.browser.setUrl(QUrl("https://jobagent.ch"))
self.EditQuery = QLineEdit()
self.EditQuery.returnPressed.connect(self.queryEditLine)
self.model = QSqlTableModel(self)
self.model.setTable("jobs")
self.model.select()
self.view = QTableView()
self.view.setModel(self.model)
self.setProxyViewSettings()
self.view.clicked.connect(self.cell_clicked)
self.PsyncDB = QPushButton("Perform sync acording to config file")
self.PsyncDB.clicked.connect(self.runWorker)
self.layout.addWidget(self.view)
self.layout.addWidget(self.b_canton)
self.layout.addWidget(self.EditQuery)
self.layout.addWidget(self.PsyncDB)
self.layout2.addLayout(self.layout)
self.layout2.addWidget(self.browser)
widget = QWidget()
widget.setLayout(self.layout2)
self.setCentralWidget(widget)
def setProxyViewSettings(self):
self.view.resizeColumnsToContents()
self.view.setColumnWidth(5,10)
self.view.hideColumn(7)
self.view.setSortingEnabled(True)
self.view.clicked.connect(self.cell_clicked)
def runWorker(self):
self.thread = QThread()
self.worker = Worker()
self.worker.moveToThread(self.thread)
self.thread.started.connect(self.disable_PsyncDB)
self.thread.started.connect(self.worker.run)
self.worker.pwprompt.connect(self.showDialog)
self.worker.finished.connect(self.thread.quit)
self.worker.finished.connect(self.enable_PsyncDB)
self.thread.start()
def disable_PsyncDB(self):
self.PsyncDB.setText("Sync Running...")
self.PsyncDB.setEnabled(False)
def enable_PsyncDB(self):
self.PsyncDB.setEnabled(True)
self.PsyncDB.setText("Perform another sync acording to config file")
def showDialog(self):
w = PWPrompt()
w.set_MSG(self.worker.messageContent)
ret = w.exec()
self.pw = w.pw
self.worker.password = w.pw
print("showDialog,self.pw:",self.pw)
self.worker.dialog_closed=True
if ret == QDialog.Rejected:
return 1
def showQueryWindow(self,checked):
if self.w is None:
self.w = QueryWindow()
self.w.show()
def filter_canton(self,canton):
if canton != "ALL":
self.cmdCanton = f"""
WHERE EXISTS
(SELECT GDENAME FROM Cantons as w
where w.GDEKT = '{canton}' AND
b.location LIKE GDENAME) """
print("cmd canton:", self.cmdCanton)
else:
self.cmdCanton = ' '
print("disable filter")
# self.customSQL(self.cmd)
def queryEditLine(self):
self.cmd = self.EditQuery.text()
print(self.initcmd + self.cmdCanton +self.customcmd + self.cmd)
self.customSQL(self.initcmd+ self.cmdCanton + self.customcmd + self.cmd)
def cell_clicked(self):
x = self.view.selectionModel().currentIndex().row()
y = self.view.selectionModel().currentIndex().column()
data = self.view.model().index(x,5).data()
print("cell clicked:",x," / ",y, "-->",data)
self.browser.setUrl(QUrl(data))
def customSQL(self,cmd):
print("Run SQL Query",cmd)
self.model.setTable("")
self.model.setQuery(cmd +" ;")
self.proxymodel2 = QSortFilterProxyModel(self)
self.proxymodel2.setSourceModel(self.model)
self.view.setModel(self.proxymodel2)
self.setProxyViewSettings()
class PWPrompt(QDialog):
def __init__(self):
super().__init__()
self.pw = ''
self.MSG1 = QLabel("Please Enter Password")
self.MSG = QLabel("ACCOUNT")
self.BOK = QPushButton("OK")
self.BCancel = QPushButton("Cancel")
self.EPW = QLineEdit()
self.EPW.setEchoMode(QLineEdit.EchoMode.Password)
self.BOK.clicked.connect(self.confirm)
self.BCancel.clicked.connect(self.reject)
self.VLayout = QVBoxLayout()
self.VLayout.addWidget(self.MSG1)
self.VLayout.addWidget(self.MSG)
self.VLayout.addWidget(self.EPW)
self.VLayout.addWidget(self.BOK)
self.VLayout.addWidget(self.BCancel)
self.setLayout(self.VLayout)
def confirm(self):
self.accept()
self.pw = self.EPW.text()
def set_MSG(self,message):
self.MSG.setText(message)
class QueryWindow(QWidget):
def __init__(self):
super().__init__()
self.FlagShow = 0
self.label = QLabel("Query settings")
self.setWindowTitle("Query")
self.EditQuery = QTextEdit()
self.BSubmit = QPushButton("Submit")
self.BSubmit.clicked.connect(self.submit)
self.LFilter = QLabel()
self.LFilter.setText("Filter by Cantons")
self.CFilter = QComboBox()
self.CFilter.addItem("ALL")
for Canton in Cantons:
self.CFilter.addItem(Canton)
self.CFilter.currentTextChanged.connect(window.filter_canton)
self.CFilter.currentTextChanged.connect(self.setTFilter)
self.TFilter = QTextEdit()
self.TFilter.setReadOnly(True)
self.TInitCmd = QLabel()
self.TInitCmd.setText(window.initcmd)
self.vLayout = QVBoxLayout()
self.vLayout.addWidget(self.TInitCmd)
self.vLayout.addWidget(self.TFilter)
self.vLayout.addWidget(self.EditQuery)
self.vLayout.addWidget(self.BSubmit)
self.LShowViews = QLabel()
self.LShowViews.setText("Custom Views in Database")
self.CShowViews = QComboBox()
items = self.getViews()
for item in items:
self.CShowViews.addItem(item)
self.CShowViews.currentTextChanged.connect(self.setView)
self.PApplyView = QCheckBox()
self.PApplyView.setText("Apply View")
self.PApplyView.clicked.connect(self.setView)
self.vrLayout = QVBoxLayout()
self.vrLayout.addWidget(self.LFilter)
self.vrLayout.addWidget(self.CFilter)
self.vrLayout.addWidget(self.LShowViews)
self.vrLayout.addWidget(self.CShowViews)
self.vrLayout.addWidget(self.PApplyView)
self.WvrLayout = QWidget()
self.WvrLayout.setLayout(self.vrLayout)
self.WvrLayout.setMaximumSize(QSize(200,200))
self.hLayout = QHBoxLayout()
self.hLayout.addLayout(self.vLayout)
self.hLayout.addWidget(self.WvrLayout)
widget = QWidget()
self.setLayout(self.hLayout)
self.EditQuery.setText(window.customcmd)
print("Comboshowview:",self.CShowViews.currentText())
def getViews(self):
item = []
statement = f"""SELECT name FROM sqlite_master where type='view'"""
query = QSqlQuery(statement)
while query.next():
print(query.value(0))
item.append(query.value(0))
print(query.lastError())
return item
def setView(self):
if self.PApplyView.isChecked():
self.view = self.CShowViews.currentText()
print("Selected View:",self.view)
window.initcmd = f"""SELECT * FROM '{self.view}'"""
print("window.initcmd:", window.initcmd)
else:
window.initcmd = f"""SELECT * FROM jobs as b """
print("View unchecked")
self.TInitCmd.setText(window.initcmd)
def setTFilter(self):
self.TFilter.setText(window.cmdCanton)
def submit(self):
self.setView()
window.customcmd = self.EditQuery.toPlainText()
window.queryEditLine()
#print("text:",window.customcmd)
#window.customSQL(window.customcmd)
self.hide()
def out(self,s):
print("Current selection",s)
app = QApplication(sys.argv)
con = QSqlDatabase.addDatabase("QSQLITE")
con.setDatabaseName("../db/sqlite3.db")
if not con.open():
qDebug("Error on opening sql database")
sys.exit(1)
window = MainWindow()
window.show()
app.exec()

166
lib/helpers.py Normal file
View File

@ -0,0 +1,166 @@
import string
import requests
from bs4 import BeautifulSoup
from enum import Enum
import re
from dateconverter import *
from datetime import datetime
DEBUG = False
def log(*s):
if DEBUG:
print(s)
class mode():
#def set(self,flag,value):
# self.flag = flag
# self.value = value
#def __init__(self,):
DEFAULT = 0
LINK = 0
LOCATION_CLEANUP = 0
SWAPDATE = 0
CLEANDATE = 0
ATTRS = 0
months = [
('January','01'),
('February','02'),
('March','03'),
('April','04'),
('May','05'),
('June','06'),
('July','07'),
('August','08'),
('September','09'),
('October','10'),
('November','11'),
('December','12')]
class item():
def __init__(self,tag,tag_content,index):
self.tag = tag
self.tag_content = tag_content
self.index = index
class job():
def __init__(self,title,profession,company,location,date,description,link,tag,starred):
self.title = title
self.profession = profession
self.company = company
self.location = location
self.date = date
self.description = description
self.link = link
self.tag = tag
self.starred = starred
def __str__(self):
return "%s| %s|%s|%s|%s|%s|%s" % (self.title,self.profession,self.company,self.location,self.date,self.description,self.link)
def finder(results,item,**modes):
ATTRS = modes.get('ATTRS',0)
LOCATION_CLEANUP = modes.get('LOCATION_CLEANUP',0)
LINK = modes.get('LINK',0)
SWAPDATE = modes.get('SWAPDATE',0)
CLEANDATE = modes.get('CLEANDATE',0)
BASEURL = modes.get('BASEURL','')
content = []
i = item.index
log("Modes:",modes)
for entry in results:
if ATTRS==1:
result = entry.findAll(item.tag,attrs=item.tag_content)
log(item.tag_content)
else:
result = entry.findAll(item.tag,class_=item.tag_content)
log("found:",len(result))
if result:
log("theres a result")
if i>(len(result)-1):
log("len:",len(result)-1,"i:",i)
log("index out of bounds fall back to the %d count",i)
# input("Press Enter..")
i=(len(result)-1)
result2 = result[i]
if LOCATION_CLEANUP==1:
location = CleanLocation(result2.text.strip())
content.append(location)
elif LINK==1:
string = result2.get("href")
if BASEURL:
string = BASEURL+string
content.append(string)
elif SWAPDATE==1:
content.append(DateCHToUS(result2.text.strip()))
elif CLEANDATE==1:
content.append(jobs_ch_clean_date(result2.text.strip()))
else:
content.append(result2.text.strip())
if not result:
if CLEANDATE:
today = datetime.today().strftime('%Y-%M-%D')
content.append(today)
content.append("NOTFound")
return content
def CleanLocation(location):
#p = re.compile('CH-[0-9]{4}')
location = re.sub('CH-[0-9]{4}|[0-9]{4}| ','',location)
return location
def arrayToClass(titles,companys,locations,dates,links,tag):
jobs = []
if(len(titles) == len(companys) == len(locations) == len(dates)):
log("len:",len(titles))
for i, title in enumerate(titles):
jobs.append(job(title,"test_prof",companys[i],locations[i],dates[i],"test_desc",links[i],tag,0))
log(jobs[i])
return jobs
else:
print("Something went wrong unequal length of data arrays")
return 0
def jobs_ch_clean_date(date):
newdate=''
for i in range(11,len(date)):#remove string "Published:"
newdate+=date[i]
newdate2 = jobs_ch_switch_month(newdate)
return newdate2
def jobs_ch_switch_month(date):
newdate=''
newmonth=''
day = ''
year = ''
for i in range(3,len(date)-5):
newmonth += date[i]
for month in months:
if(month[0] == newmonth):
newmonth = month[1]
for i in range(0,2):
day+=date[i]
for i in range(len(date)-2,len(date)):
year += date[i]
newdate = '20'+year+'-'+newmonth+'-'+day
return newdate
def CleanLocation(location):
#p = re.compile('CH-[0-9]{4}')
location = re.sub('CH-[0-9]{4}|[0-9]{4}| ','',location)
return location
def extractDomain(url):
pattern = r'https:\/\/.*\..+?(?=\/)'
domain = re.match(pattern,url)
if domain:
return domain.group()
else:
return 0
def makeSession(url):
with requests.Session() as session:
page = session.get(url)
return session

38
lib/login.py Normal file
View File

@ -0,0 +1,38 @@
import requests
from helpers import *
def login(entry):
user = entry.user
pw = entry.pw
loginurl = entry.loginurl
scrapurl = entry.scrapurl
with requests.Session() as session:
headers = {
"Host": "www.jobagent.ch",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:124.0) Gecko/20100101 Firefox/124.0",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br",
"Content-Type": "application/x-www-form-urlencoded",
"Content-Length": "58",
"Origin": "https://www.jobagent.ch",
# "Connection": "keep-alive",
"Referer": "https://www.jobagent.ch/user/login",
# "Upgrade-Insecure-Requests": "1",
# "Sec-Fetch-Dest": "document",
# "Sec-Fetch-Mode": "navigate",
#"Sec-Fetch-Site": "same-origin",
# "DNT": "1",
# "Sec-GPC": "1"
}
r = session.get(loginurl)
payload = {"redirectUrl":"","email":user,"password":pw}
resp = session.post(loginurl,data=payload,headers=headers)
print(payload)
print("response from login attempt",resp.url)
if resp.url == 'https://www.jobagent.ch/user/login?error':
print("Error on login")
return -1
r = session.get(scrapurl)
return session

6
lib/main.py Normal file
View File

@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
from sysparse import parse
parse()

156
lib/scrap_jobs.py Normal file
View File

@ -0,0 +1,156 @@
from helpers import *
DEBUG = False
def log(*s):
if DEBUG:
print(s)
def indeed_com(url,session):
jobs = []
if(session == 0):
with requests.Session() as session:
page = session.get(url)
print(page)
else:
page = session.get(url)
print(page)
soup = BeautifulSoup(page.content,"html.parser")
#print(soup.prettify())
results = soup.find_all("li",class_= 'css-5lfssm eu4oa1w0')
location = item("p",{'data-testid':'text-location'},0)
ar_location = finder(results,location,LOCATION_CLEANUP=1,ATTRS=1)
company = item("p",{'data-testid':'company-name'},0)
ar_company = finder(results,location,ATTRS=1)
title = item("a",'jobTitle',0)
ar_title = finder(results,location)
date = item("span","Span-sc-1ybanni-0 Text__span-sc-1lu7urs-12 Text-sc-1lu7urs-13 krGudM hUhFmL",0)
ar_date = finder(results,date,CLEANDATE=1)
def scrap_jobs(url,entry,session):
jobs = []
log("in scrap jobs,url",url)
if(session == 0):
with requests.Session() as session:
page = session.get(url)
log(page)
else:
page = session.get(url)
log(page)
soup = BeautifulSoup(page.content,"html.parser")
#print(soup.prettify())
results = soup.find_all("div",attrs={"data-feat":"searched_jobs"})
location_class = "P-sc-hyu5hk-0 Text__p2-sc-1lu7urs-10 Span-sc-1ybanni-0 Text__span-sc-1lu7urs-12 Text-sc-1lu7urs-13 jZCxUn"
location = item("p",location_class,0)
ar_location = finder(results,location,LOCATION_CLEANUP=1)
company_class = "P-sc-hyu5hk-0 Text__p2-sc-1lu7urs-10 Span-sc-1ybanni-0 Text__span-sc-1lu7urs-12 Text-sc-1lu7urs-13 jZCxUn"
company = item("p",company_class,3)
ar_company = finder(results,company,DEFAULT=1)
title = item("span","Span-sc-1ybanni-0 Text__span-sc-1lu7urs-12 Text-sc-1lu7urs-13 VacancyItem___StyledText2-sc-iugtv6-5 iaJYDR jlFpCz dMwMcR",0)
ar_title = finder(results,title,DEFAULT=1)
date = item("span","Span-sc-1ybanni-0 Text__span-sc-1lu7urs-12 Text-sc-1lu7urs-13 krGudM hUhFmL",0)
ar_date = finder(results,date,CLEANDATE=1)
link = item("a","Link__ExtendedRR6Link-sc-czsz28-1 khAvCu Link-sc-czsz28-2 VacancyLink___StyledLink-sc-ufp08j-0 dXKwhi dDgwgk",0)
ar_link = finder(results,link,LINK=1,BASEURL="https://jobs.ch")
tag = entry.tag#get from config
return arrayToClass(ar_title,ar_company,ar_location,ar_date,ar_link,tag)
def next_url_jobs_ch(url,session,baseurl):
next_link_str = ''
if(session == 0):
with requests.Session() as session:
page = session.get(url)
else:
page = requests.get(url)
soup = BeautifulSoup(page.content,"html.parser")
result_next = soup.findAll("div",attrs={"data-cy":"paginator"})
next_=item("a",{"data-cy":"paginator-next"},0)
next_link = finder(result_next,next_,ATTRS=1,LINK=1)
if next_link:
if(next_link[0] != "NOTFound"):
next_link_str = str(next_link[0])
next_link_str = baseurl + next_link_str
log(next_link_str)
else:
return 0
if next_link_str != '':
return next_link_str
else:
return 0
def next_url_jobagent(base_url,session,c):#depreacted will be removed in the future
found = False
if(session == 0):
with requests.Session() as session:
page = session.get(base_url)
else:
page = requests.get(base_url)
soup = BeautifulSoup(page.content,"html.parser")
results = soup.find("ul",class_="pagination")
if(results != None):
pages = results.text
if(results == None):
print("pagination next not found, probably end of pages:")
next_url_names = soup.find_all("a",class_="btn btn-sm btn-secondary")
for i2 in next_url_names:
striped_string = i2.text.strip()
log(i2.text.strip(),"stripped:",striped_string)
# print("Printable characters?",striped_string.isprintable())
if (striped_string) == "Nächste Seite":
log(i2)
next_url = i2.get("href")
log("url of next site")
found = True
return next_url
break
if found == False:
return 0
def scrap_jobagent(url,entry,session):
jobs = []
log("in scrap jobs,url",url)
if(session == 0):
with requests.Session() as session:
page = session.get(url)
log(page)
else:
page = session.get(url)
log(page)
soup = BeautifulSoup(page.content,"html.parser")
#print(soup.prettify())
results = soup.find_all("li",class_="item")
title = item("span","jobtitle",0)
ar_title = finder(results,title)
location = item("span","location",0)
ar_location = finder(results,location,LOCATION_CLEANUP=1)
company = item("span","company",0)
ar_company = finder(results,company,DEFAULT=1)
link = item("a","title",0)
ar_link = finder(results,link,LINK=1)
date = item("span","pubdate",0)
ar_date = finder(results,date,SWAPDATE=1)
tag = entry.tag
return arrayToClass(ar_title,ar_company,ar_location,ar_date,ar_link,tag)

105
lib/sysparse.py Normal file
View File

@ -0,0 +1,105 @@
import argparse
import config
import sys
from enum import IntEnum
from scrap_jobs import *
from login import *
from time import sleep
from db import *
def choose_scraper(entry,session):
if not session:
session = requests.Session()
domain = extractDomain(entry.scrapurl)
match domain:
case 'https://www.jobs.ch':
runner(entry,session,scrap_jobs,next_url_jobs_ch)
case 'https://software-job.ch':
runner(entry,session,scrap_jobagent,next_url_jobagent)
case 'https://www.jobagent.ch':
runner(entry,session,scrap_jobagent,next_url_jobagent)
def parse(**kwargs):
session=0
if len(sys.argv)>1:
worker=0
parser = argparse.ArgumentParser()
parser.add_argument("-c","--config",help = "Specific a config file to use,from where to scrap the jobs")
parser.add_argument("-t","--test",help = "only for test purposes while developing",action="store_true")
parser.add_argument("--importregiondb",help = "Import a database used for querring by Regions or Cantons",action="store_true")
parser.add_argument("--initdb",help = "Initialice a new db from scratch without entrys",action="store_true")
parser.add_argument("--rmdb",help = "!!reove existing db!!DATALOSS!!",action="store_true")
# parser.add_argument("--help",help = "print help")
parser.add_argument("--login",nargs=3,help = "login by specifing login and passwor by a given url",metavar=('USERNAME','PASSWORD','URL'))
parser.add_argument("--createnwview",help = "Create a VIEW for the Region Nordwest Schweiz",action="store_true")
args = parser.parse_args()
if args.test:
session = makeSession(sys.argv[args.test])
choose_scraper(arg.test,session)
if args.importregiondb:
importdb("../db/sqlite3.db","../db/Cantons.db","Cantons")
if args.initdb:
initdb("../db/sqlite3.db")
if args.rmdb:
rmdb("../db/sqlite3.db","jobs")
if args.login:
user,pw,url = args.login
session = login(user,pw,url,url)
choose_scraper(url,session)
if args.config:
login_loop(args.config,False,worker)
if args.createnwview:
createnwview("../db/sqlite3.db")
if len(kwargs)>0:
print("no sysargs fiven, running as a module")
vconfig = kwargs.get('config')
worker = kwargs.get('worker')
print("config:",vconfig)
if vconfig:
login_loop(vconfig,True,worker)
worker.finished.emit()
print("finished sync job")
def login_loop(config_file,gui,worker):
ret = -1
ret_login = 0
session = 0
while (ret != 0):
ret = entry2 = config.readConfig(config_file,gui,worker)
print(entry2)
if(ret != 0 and ret_login != 1):
if(entry2.loginurl != 'NONE'):
session = -1
while session == -1:
session = login(entry2)
if session == -1:
ret_login = entry2.input_pw(gui,entry2.user,worker)
choose_scraper(entry2,session)
def runner(entry,session,scrap_func,next_url_func):
i=0
b_url = entry.scrapurl
while b_url != 0 and i<50:
sleep(0.3)
if b_url:
domain = extractDomain(b_url)
print(domain)
if domain == 'https://www.jobagent.ch' or domain == 'https://software-job.ch':
jobs = scrap_func(b_url,entry,session)
writedb(jobs)
b_url = next_url_func(b_url,session,0)
elif domain == 'https://www.jobs.ch':
jobs = scrap_func(b_url,entry,session)
writedb(jobs)
b_url = next_url_func(b_url,session,"https://www.jobs.ch")
if b_url != 0:
print("main:" + b_url)
if b_url==0:
print("End of listed items, or did not find any other Nächste Seite Buttons")
i=i+1
print(i)

4
querry.note Normal file
View File

@ -0,0 +1,4 @@
ATTACH Cantons.db AS Cantons
ATTACH 2.db AS db2
SELECT * FROM Jobs.jobs as b WHERE EXISTS (SELECT GDENAME FROM Cantons.cantons as w where w.GDEKT = 'ZH' AND b.location LIKE GDENAME);

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
beautifulsoup4==4.12.3
mmh3==4.1.0
numpy==1.26.4
Requests==2.31.0
pyside6