This commit is contained in:
2022-10-20 10:28:21 +02:00
commit 205b16263c
10 changed files with 544 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
venv
.vscode
*.pyc

72
main.py Normal file
View File

@@ -0,0 +1,72 @@
import asyncio
from asyncio.queues import Queue
from datetime import date
import hashlib
from sqlitedict import SqliteDict
from parsers import PARSERS
from parsers.base import Announcement
import aiogram
BOT_TOKEN = "1111111111:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
CHANNEL_ID = -1001872853621
bot = aiogram.Bot(BOT_TOKEN)
async def start_parsers(db: SqliteDict, queue: Queue) -> None:
await asyncio.gather(
*[parser.start_parse(db, queue) for parser in PARSERS]
)
async def notify(item: Announcement) -> bool:
message = item.link
try:
await bot.send_message(CHANNEL_ID, message)
return True
except aiogram.exceptions.TelegramAPIError:
return False
async def start_notifier(db: SqliteDict, queue: Queue) -> None:
while True:
item: Announcement = await queue.get()
link_hash = hashlib.md5(item.link.encode()).hexdigest()
current_value = db.get(link_hash, None)
item_date = item.update_date
if current_value != item_date.isoformat():
if item_date == date.today():
if await notify(item):
db[link_hash] = item_date.isoformat()
db.commit()
else:
db[link_hash] = item_date.isoformat()
db.commit()
await asyncio.sleep(1)
async def main():
db = SqliteDict("notify.sqlite")
queue = Queue()
try:
await asyncio.gather(
start_parsers(db, queue),
start_notifier(db, queue)
)
finally:
db.close()
if __name__ == "__main__":
while True:
asyncio.run(main())

15
parsers/__init__.py Normal file
View File

@@ -0,0 +1,15 @@
from .base import BaseParser
from .halooglasi import HalooglasiParser
from .sasomange import SasomangleParser
from .oglasi import OglasiParser
from .imovina import ImovinaParser
from .fzida import FzidaParser
PARSERS: list[BaseParser] = [
HalooglasiParser,
SasomangleParser,
OglasiParser,
ImovinaParser,
FzidaParser
]

117
parsers/base.py Normal file
View File

@@ -0,0 +1,117 @@
import abc
import asyncio
from dataclasses import dataclass
from datetime import date
import hashlib
from typing import Optional
from bs4 import BeautifulSoup
from sqlitedict import SqliteDict
import httpx
@dataclass
class AnnoncementPreview:
title: str
update_date: Optional[date]
link: str
@dataclass
class Announcement:
title: str
description: str
price: int
update_date: date
link: str
class BaseParser(abc.ABC):
BASE_SEARCH_LINK = ""
BASE_PARAMS = {}
PAGE_PARAM = ""
@classmethod
def process_previews_page(cls, bs: BeautifulSoup) -> list[AnnoncementPreview]:
...
@classmethod
@abc.abstractmethod
def process_annoncement_data(cls, bs: BeautifulSoup, preview: AnnoncementPreview) -> Announcement:
...
@classmethod
async def get_annoncement_by_preview(cls, preview: AnnoncementPreview) -> Optional[Announcement]:
print(f"Get annoncement by link: {preview.link} ...")
async with httpx.AsyncClient() as client:
try:
response = await client.get(preview.link)
except httpx.ConnectError:
return None
except httpx.ReadTimeout:
return None
except httpx.ConnectTimeout:
return None
bs = BeautifulSoup(response.text, features="html.parser")
return cls.process_annoncement_data(bs, preview)
@classmethod
async def parse(cls, db: SqliteDict, queue: asyncio.Queue) -> None:
page = 1
while page <= 20:
params = {
**cls.BASE_PARAMS,
cls.PAGE_PARAM: page
}
print(f"Get {cls.__name__} page {page} previews...")
async with httpx.AsyncClient() as client:
try:
response = await client.get(cls.BASE_SEARCH_LINK, params=params)
except httpx.ReadTimeout:
return
bs = BeautifulSoup(response.text, features="html.parser")
previews = cls.process_previews_page(bs)
last_annoncement_date = None
for preview in previews:
if preview.update_date:
last_annoncement_date = preview.update_date
if preview.update_date is not None and preview.update_date != date.today():
continue
link_hash = hashlib.md5(preview.link.encode()).hexdigest()
if db.get(link_hash, None) == date.today().isoformat():
last_annoncement_date = date.today()
continue
annoncement = await cls.get_annoncement_by_preview(preview)
if annoncement:
await queue.put(annoncement)
last_annoncement_date = annoncement.update_date
page += 1
if last_annoncement_date is None:
continue
if (date.today() - last_annoncement_date).days >= 2:
break
@classmethod
async def start_parse(cls, db: SqliteDict, queue: asyncio.Queue) -> None:
while True:
await cls.parse(db, queue)
await asyncio.sleep(180)

68
parsers/fzida.py Normal file
View File

@@ -0,0 +1,68 @@
from bs4 import BeautifulSoup
import dateparser
from .base import BaseParser, AnnoncementPreview, Announcement
class FzidaParser(BaseParser):
BASE_LINK = "https://www.4zida.rs"
BASE_SEARCH_LINK = "https://www.4zida.rs/izdavanje-stanova/novi-sad"
BASE_PARAMS = {
"jeftinije_od": "1000eur",
"vece_od": "36m2",
"namesteno": ["namesteno", "polunamesteno"],
"sortiranje": "najnoviji"
}
PAGE_PARAM = "strana"
@classmethod
def process_previews_page(cls, bs: BeautifulSoup) -> list[AnnoncementPreview]:
result: list[AnnoncementPreview] = []
for item in bs.find_all("app-ad-search-preview"):
title_el = item.find("h3", {"class": "description"})
link_el = item.find("a")
result.append(AnnoncementPreview(
title=title_el.text,
link=cls.BASE_LINK + link_el.attrs["href"],
update_date=None
))
return result
@classmethod
def process_annoncement_data(cls, bs: BeautifulSoup, preview: AnnoncementPreview) -> Announcement:
description_el = bs.find("pre", {"class": "ed-description collapsed-description ng-star-inserted"})
price_el = bs.find("div", {"class": "prices"})
update_date_el = bs.find("app-info-item", {"label": "Oglas proveren"})
update_date_value_el = update_date_el.find("strong", {"class": "value"})
update_date_value = update_date_value_el.text \
.replace("pre", "ago") \
.replace("dan", "day") \
.replace("minuta", "minute") \
.replace("sati", "hour") \
.replace("daya", "day") \
.replace("sekunde", "second") \
.replace("minut", "minute") \
.replace("minutee", "minute") \
.replace("sekundi", "second") \
.replace("sat", "hour") \
.replace("houra", "hour") \
.replace("mesec", "month") \
.replace("montha", "month")
update_date = dateparser.parse(update_date_value)
if update_date is None:
raise Exception(f"Update_date from {update_date_value}!")
return Announcement(
title=description_el.text if description_el else "",
# square=float(square_value),
description=description_el.text if description_el else "",
price=float(price_el.text.split("\xa0")[0].split(".")[0].replace(",", ".")),
update_date=update_date.date(),
link=preview.link
)

64
parsers/halooglasi.py Normal file
View File

@@ -0,0 +1,64 @@
import json
from bs4 import BeautifulSoup
from dateutil.parser import parse
from .base import BaseParser, AnnoncementPreview, Announcement
class HalooglasiParser(BaseParser):
BASE_LINK = "https://www.halooglasi.com"
BASE_SEARCH_LINK = "https://www.halooglasi.com/nekretnine/izdavanje-stanova/novi-sad"
BASE_PARAMS = {
"cena_d_to": 1000,
"cena_d_unit": 4,
"kvadratura_d_from": 30,
"kvadratura_d_unit": 1,
"namestenost_id_l": "563,562"
}
PAGE_PARAM = "page"
@classmethod
def process_previews_page(cls, bs: BeautifulSoup) -> list[AnnoncementPreview]:
result: list[AnnoncementPreview] = []
product_list_el = bs.find("div", {"class": "row product-list"})
for product_el in product_list_el.find_all("div", {"class": "col-md-12 col-sm-12 col-xs-12 col-lg-12"}):
publish_date_el = product_el.find("span", {"class": "publish-date"})
title_el = product_el.find("h3", {"class": "product-title"})
link_el = title_el.find("a")
# features_el = product_el.find("ul", {"class": "product-features"})
result.append(
AnnoncementPreview(
title=title_el.text,
# square=float(features_el.contents[0].text.split("\xa0")[0]),
# floor=float(features_el.contents[1].text.split("\xa0")[0].replace("+", "")),
update_date=parse(publish_date_el.text[:-1]).date(),
link=cls.BASE_LINK + link_el.attrs["href"],
)
)
return result
@classmethod
def process_annoncement_data(cls, bs: BeautifulSoup, preview: AnnoncementPreview) -> Announcement:
pre_content = bs.find("div", {"class": "pre-content"})
data_div = pre_content.contents[3].find("script")
data_string = data_div.text.split("\r\n")[2] \
.replace("\tQuidditaEnvironment.CurrentClassified=", "") \
.replace("; for (var i in QuidditaEnvironment.CurrentClassified.OtherFields) { QuidditaEnvironment.CurrentClassified[i] = QuidditaEnvironment.CurrentClassified.OtherFields[i]; };", "")
data = json.loads(data_string)
return Announcement(
title=data["Title"],
# square=data["OtherFields"]["kvadratura_d"],
# floor=float(data["OtherFields"]["broj_soba_s"].replace("+", "")),
description=data["TextHtml"],
price=data["OtherFields"]["cena_d"],
update_date=parse(data["ValidFrom"]).date(),
link=preview.link
)

73
parsers/imovina.py Normal file
View File

@@ -0,0 +1,73 @@
from bs4 import BeautifulSoup
from dateutil.parser import parse
from .base import BaseParser, AnnoncementPreview, Announcement
class ImovinaParser(BaseParser):
BASE_LINK = "https://imovina.net"
BASE_SEARCH_LINK = "https://imovina.net/pretraga_nekretnina/izdavanje/"
BASE_PARAMS = {
"search": "TRA%8EI",
"category[]": "2",
"country": "SR",
"mainRegion": "25",
"region[]": "336",
"regionName": "Centar Novi Sad",
"offerTypeParent": "39",
"priceFrom": "",
"priceTo": "1000",
"surfaceFrom": "30",
"surfaceTo": "",
"fastSearch": "TRAŽI",
"offerType[]": ["5", "57", "65", "61", "1", "6", "8", "19", "58", "2", "59", "3", "60", "4"]
}
PAGE_PARAM = "page"
@classmethod
def process_previews_page(cls, bs: BeautifulSoup) -> list[AnnoncementPreview]:
result: list[AnnoncementPreview] = []
list_view_el = bs.find("ul", {"class": "offers2"})
for item in list_view_el.find_all("li"):
if len(item.contents) != 4:
continue
link_el = item.contents[0]
title_el = item.contents[2]
result.append(AnnoncementPreview(
title=title_el.text,
link=link_el.attrs["href"].split("?")[0],
update_date=None,
))
return result
@classmethod
def process_annoncement_data(cls, bs: BeautifulSoup, preview: AnnoncementPreview) -> Announcement:
offer_details_el = bs.find("div", {"id": "offerDetailsWrapper"})
offer_data_el = offer_details_el.find("dl", {"id": "offerData"})
info_el = offer_details_el.find("div", {"id": "infoListId"})
publish_info_el = offer_details_el.find("p", {"class": "offerPublished"})
title_el = offer_details_el.find("h1")
price_el = offer_details_el.find("div", {"id": "price_EURId"})
description_el = info_el.contents[2]
square = ""
for content in offer_data_el.contents:
if "Kvadratura m2:" in str(content):
square = content.nextSibling.contents[0]
return Announcement(
title=title_el.text,
# square=float(square),
description=description_el.text,
price=float(price_el.contents[0].replace(" ", "").replace("EUR", "")),
update_date=parse(publish_info_el.text.split("dana ")[1].split(" god")[0]).date(),
link=preview.link
)

69
parsers/oglasi.py Normal file
View File

@@ -0,0 +1,69 @@
from bs4 import BeautifulSoup
from dateutil.parser import parse
from .base import BaseParser, AnnoncementPreview, Announcement
class OglasiParser(BaseParser):
BASE_LINK = "https://www.oglasi.rs"
BASE_SEARCH_LINK = "https://www.oglasi.rs/nekretnine/izdavanje-stanova/novi-sad"
BASE_PARAMS = {
"s": "d",
"pr[e]": "1000",
"pr[c]": "EUR",
"d[Kvadratura][0]": "30",
"d[Kvadratura][1]": "40",
"d[Kvadratura][2]": "50",
"d[Kvadratura][3]": "60",
"d[Kvadratura][4]": "70",
"d[Kvadratura][5]": "80",
"d[Kvadratura][6]": "90",
"d[Kvadratura][7]": "100",
"d[Kvadratura][8]": "110",
"d[Kvadratura][9]": "120",
"d[Kvadratura][10]": "130",
"d[Kvadratura][11]": "140"
}
PAGE_PARAM = "p"
@classmethod
def process_previews_page(cls, bs: BeautifulSoup) -> list[AnnoncementPreview]:
result: list[AnnoncementPreview] = []
for item in bs.find_all("div", {"class": "fpogl-holder advert_list_item_normalan"}):
title_el = item.find("h2", {"itemprop": "name"})
update_date_el = item.find("time")
link_el = item.find("a", {"class": "fpogl-list-title"})
result.append(
AnnoncementPreview(
title=title_el.text,
update_date=parse(update_date_el.attrs["datetime"]).date(),
link=cls.BASE_LINK + link_el.attrs["href"],
)
)
return result
@classmethod
def process_annoncement_data(cls, bs: BeautifulSoup, preview: AnnoncementPreview) -> Announcement:
description_el = bs.find("div", {"itemprop": "description"})
price_el = bs.find("span", {"itemprop": "price"})
time = bs.find("time")
# attr_table = bs.find("table")
# attrs_els = attr_table.find_all("tr")
# square = ""
# for attr in attrs_els:
# if "Kvadratura" in attr.text:
# square = attr.contents[3].text.split("m")[0].lstrip()
return Announcement(
title=preview.title,
# square=float(square),
description=description_el.text,
price=float(price_el.text.split(",")[1]) if price_el else -1.0,
update_date=parse(time.text).date(),
link=preview.link
)

57
parsers/sasomange.py Normal file
View File

@@ -0,0 +1,57 @@
from bs4 import BeautifulSoup
from dateutil.parser import parse
from .base import BaseParser, AnnoncementPreview, Announcement
class SasomangleParser(BaseParser):
BASE_LINK = "https://sasomange.rs"
BASE_SEARCH_LINK = "https://sasomange.rs/c/stanovi-iznajmljivanje/f/novi-sad"
BASE_PARAMS = {
"productsFacets.facets": "priceValue:(*-1000),facility_area_range_flat_rent:(36-*)"
}
PAGE_PARAM = "currentPage"
@classmethod
def process_previews_page(cls, bs: BeautifulSoup) -> list[AnnoncementPreview]:
result: list[AnnoncementPreview] = []
list_view_el = bs.find("ul", {"class": "list-view js-list-view-item"})
for item in list_view_el.find_all("a", {"class": "product-item"}):
title_el = item.find("h3", {"class": "name"})
update_date = item.find("div", {"class": "start-date-content"})
result.append(AnnoncementPreview(
title=title_el.text,
update_date=parse(update_date.text[:-1]).date(),
link=cls.BASE_LINK + item.attrs["href"],
))
return result
@classmethod
def process_annoncement_data(cls, bs: BeautifulSoup, preview: AnnoncementPreview) -> Announcement:
title_el = bs.find("h1", {"class": "name"})
description_el = bs.find("div", {"class": "body-text-content"})
price_el = bs.find("span", {"class": "price-content"})
date_el = bs.find("em", {"class": "icon icon-clock"})
date_value_el = date_el.parent.find("span", {"class": "value"})
date_text = date_value_el.text.rstrip().lstrip()[:-1]
# square_value = ""
# product_attributes_el = bs.find("ul", {"class": "product-attributes-list"})
# for attribute in product_attributes_el.find_all("li", {"class": "list-item"}):
# if "Površina" in attribute.text:
# square_value_el = attribute.find("p", {"class": "value"})
# square_value = square_value_el.contents[1].text
return Announcement(
title=title_el.text,
# square=float(square_value),
description=description_el.text,
price=float(price_el.text.split("\xa0")[0].split(".")[0].replace(",", ".")),
update_date=parse(date_text).date(),
link=preview.link
)

6
requirements.txt Normal file
View File

@@ -0,0 +1,6 @@
beautifulsoup4
python-dateutil
dateparser
sqlitedict
aiogram
httpx