diff --git a/poetry.lock b/poetry.lock index f9200be..839a22a 100644 --- a/poetry.lock +++ b/poetry.lock @@ -349,6 +349,109 @@ files = [ {file = "h11-0.14.0.tar.gz", hash = "sha256:8f19fbbe99e72420ff35c00b27a34cb9937e902a8b810e2c88300c6f0a3b699d"}, ] +[[package]] +name = "hiredis" +version = "3.0.0" +description = "Python wrapper for hiredis" +optional = false +python-versions = ">=3.8" +files = [ + {file = "hiredis-3.0.0-cp310-cp310-macosx_10_15_universal2.whl", hash = "sha256:4b182791c41c5eb1d9ed736f0ff81694b06937ca14b0d4dadde5dadba7ff6dae"}, + {file = "hiredis-3.0.0-cp310-cp310-macosx_10_15_x86_64.whl", hash = "sha256:13c275b483a052dd645eb2cb60d6380f1f5215e4c22d6207e17b86be6dd87ffa"}, + {file = "hiredis-3.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c1018cc7f12824506f165027eabb302735b49e63af73eb4d5450c66c88f47026"}, + {file = "hiredis-3.0.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:83a29cc7b21b746cb6a480189e49f49b2072812c445e66a9e38d2004d496b81c"}, + {file = "hiredis-3.0.0-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e241fab6332e8fb5f14af00a4a9c6aefa22f19a336c069b7ddbf28ef8341e8d6"}, + {file = "hiredis-3.0.0-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:1fb8de899f0145d6c4d5d4bd0ee88a78eb980a7ffabd51e9889251b8f58f1785"}, + {file = "hiredis-3.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b23291951959141173eec10f8573538e9349fa27f47a0c34323d1970bf891ee5"}, + {file = "hiredis-3.0.0-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e421ac9e4b5efc11705a0d5149e641d4defdc07077f748667f359e60dc904420"}, + {file = "hiredis-3.0.0-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:77c8006c12154c37691b24ff293c077300c22944018c3ff70094a33e10c1d795"}, + {file = "hiredis-3.0.0-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:41afc0d3c18b59eb50970479a9c0e5544fb4b95e3a79cf2fbaece6ddefb926fe"}, + {file = "hiredis-3.0.0-cp310-cp310-musllinux_1_2_ppc64le.whl", hash = "sha256:04ccae6dcd9647eae6025425ab64edb4d79fde8b9e6e115ebfabc6830170e3b2"}, + {file = "hiredis-3.0.0-cp310-cp310-musllinux_1_2_s390x.whl", hash = "sha256:fe91d62b0594db5ea7d23fc2192182b1a7b6973f628a9b8b2e0a42a2be721ac6"}, + {file = "hiredis-3.0.0-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:99516d99316062824a24d145d694f5b0d030c80da693ea6f8c4ecf71a251d8bb"}, + {file = "hiredis-3.0.0-cp310-cp310-win32.whl", hash = "sha256:562eaf820de045eb487afaa37e6293fe7eceb5b25e158b5a1974b7e40bf04543"}, + {file = "hiredis-3.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:a1c81c89ed765198da27412aa21478f30d54ef69bf5e4480089d9c3f77b8f882"}, + {file = "hiredis-3.0.0-cp311-cp311-macosx_10_15_universal2.whl", hash = "sha256:4664dedcd5933364756d7251a7ea86d60246ccf73a2e00912872dacbfcef8978"}, + {file = "hiredis-3.0.0-cp311-cp311-macosx_10_15_x86_64.whl", hash = "sha256:47de0bbccf4c8a9f99d82d225f7672b9dd690d8fd872007b933ef51a302c9fa6"}, + {file = "hiredis-3.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:e43679eca508ba8240d016d8cca9d27342d70184773c15bea78a23c87a1922f1"}, + {file = "hiredis-3.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:13c345e7278c210317e77e1934b27b61394fee0dec2e8bd47e71570900f75823"}, + {file = "hiredis-3.0.0-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:00018f22f38530768b73ea86c11f47e8d4df65facd4e562bd78773bd1baef35e"}, + {file = "hiredis-3.0.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:4ea3a86405baa8eb0d3639ced6926ad03e07113de54cb00fd7510cb0db76a89d"}, + {file = "hiredis-3.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c073848d2b1d5561f3903879ccf4e1a70c9b1e7566c7bdcc98d082fa3e7f0a1d"}, + {file = "hiredis-3.0.0-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:5a8dffb5f5b3415a4669d25de48b617fd9d44b0bccfc4c2ab24b06406ecc9ecb"}, + {file = "hiredis-3.0.0-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:22c17c96143c2a62dfd61b13803bc5de2ac526b8768d2141c018b965d0333b66"}, + {file = "hiredis-3.0.0-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:c3ece960008dab66c6b8bb3a1350764677ee7c74ccd6270aaf1b1caf9ccebb46"}, + {file = "hiredis-3.0.0-cp311-cp311-musllinux_1_2_ppc64le.whl", hash = "sha256:f75999ae00a920f7dce6ecae76fa5e8674a3110e5a75f12c7a2c75ae1af53396"}, + {file = "hiredis-3.0.0-cp311-cp311-musllinux_1_2_s390x.whl", hash = "sha256:e069967cbd5e1900aafc4b5943888f6d34937fc59bf8918a1a546cb729b4b1e4"}, + {file = "hiredis-3.0.0-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:0aacc0a78e1d94d843a6d191f224a35893e6bdfeb77a4a89264155015c65f126"}, + {file = "hiredis-3.0.0-cp311-cp311-win32.whl", hash = "sha256:719c32147ba29528cb451f037bf837dcdda4ff3ddb6cdb12c4216b0973174718"}, + {file = "hiredis-3.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:bdc144d56333c52c853c31b4e2e52cfbdb22d3da4374c00f5f3d67c42158970f"}, + {file = "hiredis-3.0.0-cp312-cp312-macosx_10_15_universal2.whl", hash = "sha256:484025d2eb8f6348f7876fc5a2ee742f568915039fcb31b478fd5c242bb0fe3a"}, + {file = "hiredis-3.0.0-cp312-cp312-macosx_10_15_x86_64.whl", hash = "sha256:fcdb552ffd97151dab8e7bc3ab556dfa1512556b48a367db94b5c20253a35ee1"}, + {file = "hiredis-3.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0bb6f9fd92f147ba11d338ef5c68af4fd2908739c09e51f186e1d90958c68cc1"}, + {file = "hiredis-3.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fa86bf9a0ed339ec9e8a9a9d0ae4dccd8671625c83f9f9f2640729b15e07fbfd"}, + {file = "hiredis-3.0.0-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e194a0d5df9456995d8f510eab9f529213e7326af6b94770abf8f8b7952ddcaa"}, + {file = "hiredis-3.0.0-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:c8a1df39d74ec507d79c7a82c8063eee60bf80537cdeee652f576059b9cdd15c"}, + {file = "hiredis-3.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f91456507427ba36fd81b2ca11053a8e112c775325acc74e993201ea912d63e9"}, + {file = "hiredis-3.0.0-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9862db92ef67a8a02e0d5370f07d380e14577ecb281b79720e0d7a89aedb9ee5"}, + {file = "hiredis-3.0.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:d10fcd9e0eeab835f492832b2a6edb5940e2f1230155f33006a8dfd3bd2c94e4"}, + {file = "hiredis-3.0.0-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:48727d7d405d03977d01885f317328dc21d639096308de126c2c4e9950cbd3c9"}, + {file = "hiredis-3.0.0-cp312-cp312-musllinux_1_2_ppc64le.whl", hash = "sha256:8e0bb6102ebe2efecf8a3292c6660a0e6fac98176af6de67f020bea1c2343717"}, + {file = "hiredis-3.0.0-cp312-cp312-musllinux_1_2_s390x.whl", hash = "sha256:df274e3abb4df40f4c7274dd3e587dfbb25691826c948bc98d5fead019dfb001"}, + {file = "hiredis-3.0.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:034925b5fb514f7b11aac38cd55b3fd7e9d3af23bd6497f3f20aa5b8ba58e232"}, + {file = "hiredis-3.0.0-cp312-cp312-win32.whl", hash = "sha256:120f2dda469b28d12ccff7c2230225162e174657b49cf4cd119db525414ae281"}, + {file = "hiredis-3.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:e584fe5f4e6681d8762982be055f1534e0170f6308a7a90f58d737bab12ff6a8"}, + {file = "hiredis-3.0.0-cp38-cp38-macosx_10_15_universal2.whl", hash = "sha256:122171ff47d96ed8dd4bba6c0e41d8afaba3e8194949f7720431a62aa29d8895"}, + {file = "hiredis-3.0.0-cp38-cp38-macosx_10_15_x86_64.whl", hash = "sha256:ba9fc605ac558f0de67463fb588722878641e6fa1dabcda979e8e69ff581d0bd"}, + {file = "hiredis-3.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:a631e2990b8be23178f655cae8ac6c7422af478c420dd54e25f2e26c29e766f1"}, + {file = "hiredis-3.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:63482db3fadebadc1d01ad33afa6045ebe2ea528eb77ccaabd33ee7d9c2bad48"}, + {file = "hiredis-3.0.0-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1f669212c390eebfbe03c4e20181f5970b82c5d0a0ad1df1785f7ffbe7d61150"}, + {file = "hiredis-3.0.0-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a6a49ef161739f8018c69b371528bdb47d7342edfdee9ddc75a4d8caddf45a6e"}, + {file = "hiredis-3.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:98a152052b8878e5e43a2e3a14075218adafc759547c98668a21e9485882696c"}, + {file = "hiredis-3.0.0-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:50a196af0ce657fcde9bf8a0bbe1032e22c64d8fcec2bc926a35e7ff68b3a166"}, + {file = "hiredis-3.0.0-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:f2f312eef8aafc2255e3585dcf94d5da116c43ef837db91db9ecdc1bc930072d"}, + {file = "hiredis-3.0.0-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:6ca41fa40fa019cde42c21add74aadd775e71458051a15a352eabeb12eb4d084"}, + {file = "hiredis-3.0.0-cp38-cp38-musllinux_1_2_ppc64le.whl", hash = "sha256:6eecb343c70629f5af55a8b3e53264e44fa04e155ef7989de13668a0cb102a90"}, + {file = "hiredis-3.0.0-cp38-cp38-musllinux_1_2_s390x.whl", hash = "sha256:c3fdad75e7837a475900a1d3a5cc09aa024293c3b0605155da2d42f41bc0e482"}, + {file = "hiredis-3.0.0-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:8854969e7480e8d61ed7549eb232d95082a743e94138d98d7222ba4e9f7ecacd"}, + {file = "hiredis-3.0.0-cp38-cp38-win32.whl", hash = "sha256:f114a6c86edbf17554672b050cce72abf489fe58d583c7921904d5f1c9691605"}, + {file = "hiredis-3.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:7d99b91e42217d7b4b63354b15b41ce960e27d216783e04c4a350224d55842a4"}, + {file = "hiredis-3.0.0-cp39-cp39-macosx_10_15_universal2.whl", hash = "sha256:4c6efcbb5687cf8d2aedcc2c3ed4ac6feae90b8547427d417111194873b66b06"}, + {file = "hiredis-3.0.0-cp39-cp39-macosx_10_15_x86_64.whl", hash = "sha256:5b5cff42a522a0d81c2ae7eae5e56d0ee7365e0c4ad50c4de467d8957aff4414"}, + {file = "hiredis-3.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:82f794d564f4bc76b80c50b03267fe5d6589e93f08e66b7a2f674faa2fa76ebc"}, + {file = "hiredis-3.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d7a4c1791d7aa7e192f60fe028ae409f18ccdd540f8b1e6aeb0df7816c77e4a4"}, + {file = "hiredis-3.0.0-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a2537b2cd98192323fce4244c8edbf11f3cac548a9d633dbbb12b48702f379f4"}, + {file = "hiredis-3.0.0-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:8fed69bbaa307040c62195a269f82fc3edf46b510a17abb6b30a15d7dab548df"}, + {file = "hiredis-3.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:869f6d5537d243080f44253491bb30aa1ec3c21754003b3bddeadedeb65842b0"}, + {file = "hiredis-3.0.0-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d435ae89073d7cd51e6b6bf78369c412216261c9c01662e7008ff00978153729"}, + {file = "hiredis-3.0.0-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:204b79b30a0e6be0dc2301a4d385bb61472809f09c49f400497f1cdd5a165c66"}, + {file = "hiredis-3.0.0-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:3ea635101b739c12effd189cc19b2671c268abb03013fd1f6321ca29df3ca625"}, + {file = "hiredis-3.0.0-cp39-cp39-musllinux_1_2_ppc64le.whl", hash = "sha256:f359175197fd833c8dd7a8c288f1516be45415bb5c939862ab60c2918e1e1943"}, + {file = "hiredis-3.0.0-cp39-cp39-musllinux_1_2_s390x.whl", hash = "sha256:ac6d929cb33dd12ad3424b75725975f0a54b5b12dbff95f2a2d660c510aa106d"}, + {file = "hiredis-3.0.0-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:100431e04d25a522ef2c3b94f294c4219c4de3bfc7d557b6253296145a144c11"}, + {file = "hiredis-3.0.0-cp39-cp39-win32.whl", hash = "sha256:e1a9c14ae9573d172dc050a6f63a644457df5d01ec4d35a6a0f097f812930f83"}, + {file = "hiredis-3.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:54a6dd7b478e6eb01ce15b3bb5bf771e108c6c148315bf194eb2ab776a3cac4d"}, + {file = "hiredis-3.0.0-pp310-pypy310_pp73-macosx_10_15_x86_64.whl", hash = "sha256:50da7a9edf371441dfcc56288d790985ee9840d982750580710a9789b8f4a290"}, + {file = "hiredis-3.0.0-pp310-pypy310_pp73-macosx_11_0_arm64.whl", hash = "sha256:9b285ef6bf1581310b0d5e8f6ce64f790a1c40e89c660e1320b35f7515433672"}, + {file = "hiredis-3.0.0-pp310-pypy310_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0dcfa684966f25b335072115de2f920228a3c2caf79d4bfa2b30f6e4f674a948"}, + {file = "hiredis-3.0.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a41be8af1fd78ca97bc948d789a09b730d1e7587d07ca53af05758f31f4b985d"}, + {file = "hiredis-3.0.0-pp310-pypy310_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:038756db735e417ab36ee6fd7725ce412385ed2bd0767e8179a4755ea11b804f"}, + {file = "hiredis-3.0.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:fcecbd39bd42cef905c0b51c9689c39d0cc8b88b1671e7f40d4fb213423aef3a"}, + {file = "hiredis-3.0.0-pp38-pypy38_pp73-macosx_10_15_x86_64.whl", hash = "sha256:a131377493a59fb0f5eaeb2afd49c6540cafcfba5b0b3752bed707be9e7c4eaf"}, + {file = "hiredis-3.0.0-pp38-pypy38_pp73-macosx_11_0_arm64.whl", hash = "sha256:3d22c53f0ec5c18ecb3d92aa9420563b1c5d657d53f01356114978107b00b860"}, + {file = "hiredis-3.0.0-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c8a91e9520fbc65a799943e5c970ffbcd67905744d8becf2e75f9f0a5e8414f0"}, + {file = "hiredis-3.0.0-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3dc8043959b50141df58ab4f398e8ae84c6f9e673a2c9407be65fc789138f4a6"}, + {file = "hiredis-3.0.0-pp38-pypy38_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:51b99cfac514173d7b8abdfe10338193e8a0eccdfe1870b646009d2fb7cbe4b5"}, + {file = "hiredis-3.0.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:fa1fcad89d8a41d8dc10b1e54951ec1e161deabd84ed5a2c95c3c7213bdb3514"}, + {file = "hiredis-3.0.0-pp39-pypy39_pp73-macosx_10_15_x86_64.whl", hash = "sha256:898636a06d9bf575d2c594129085ad6b713414038276a4bfc5db7646b8a5be78"}, + {file = "hiredis-3.0.0-pp39-pypy39_pp73-macosx_11_0_arm64.whl", hash = "sha256:466f836dbcf86de3f9692097a7a01533dc9926986022c6617dc364a402b265c5"}, + {file = "hiredis-3.0.0-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:23142a8af92a13fc1e3f2ca1d940df3dcf2af1d176be41fe8d89e30a837a0b60"}, + {file = "hiredis-3.0.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:793c80a3d6b0b0e8196a2d5de37a08330125668c8012922685e17aa9108c33ac"}, + {file = "hiredis-3.0.0-pp39-pypy39_pp73-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:467d28112c7faa29b7db743f40803d927c8591e9da02b6ce3d5fadc170a542a2"}, + {file = "hiredis-3.0.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:dc384874a719c767b50a30750f937af18842ee5e288afba95a5a3ed703b1515a"}, + {file = "hiredis-3.0.0.tar.gz", hash = "sha256:fed8581ae26345dea1f1e0d1a96e05041a727a45e7d8d459164583e23c6ac441"}, +] + [[package]] name = "httpcore" version = "1.0.5" @@ -932,6 +1035,7 @@ files = [ [package.dependencies] async-timeout = {version = ">=4.0.3", markers = "python_full_version < \"3.11.3\""} +hiredis = {version = ">=3.0.0", optional = true, markers = "extra == \"hiredis\""} [package.extras] hiredis = ["hiredis (>=3.0.0)"] @@ -1179,4 +1283,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "3724cbb8df43cb95838fe9710c7baf48ba667b605a01ff81054929b7fca1bf9f" +content-hash = "c5dc6b521c7e43f2a68b9a5e7c86e8f188c09ddefeb9771a1143504fa8109e4e" diff --git a/pyproject.toml b/pyproject.toml index 480c046..47750ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ pytz = "^2024.2" mongojet = "^0.2.4" taskiq = "^0.11.7" taskiq-redis = "^1.0.2" +redis = {extras = ["hiredis"], version = "^5.2.0"} [build-system] diff --git a/src/core/redis.py b/src/core/redis.py new file mode 100644 index 0000000..791095a --- /dev/null +++ b/src/core/redis.py @@ -0,0 +1,33 @@ +import contextlib + +from redis.asyncio import from_url + +from core.config import config + + +def create_redis_pool(): + return from_url(config.REDIS_URI) + + +class RedisSessionManager: + def __init__(self): + self.pool = None + + async def init(self): + self.pool = await create_redis_pool() + + async def close(self): + if self.pool is not None: + await self.pool.close() + + @contextlib.asynccontextmanager + async def connect(self): + if self.pool is None: + await self.init() + + assert self.pool is not None + + yield self.pool + + +redis_manager = RedisSessionManager() diff --git a/src/main.py b/src/main.py index 7482c14..25d398a 100644 --- a/src/main.py +++ b/src/main.py @@ -5,6 +5,7 @@ from modules.games_list import start as start_games_list_module from modules.stream_notifications import start as start_stream_notifications_module from core.mongo import mongo_manager +from core.redis import redis_manager logging.basicConfig(level=logging.INFO) @@ -17,6 +18,7 @@ async def main(): logger.info("Starting services...") await mongo_manager.init() + await redis_manager.init() await wait([ create_task(start_games_list_module()), diff --git a/src/modules/stream_notifications/__init__.py b/src/modules/stream_notifications/__init__.py index ee583fb..c490889 100644 --- a/src/modules/stream_notifications/__init__.py +++ b/src/modules/stream_notifications/__init__.py @@ -1,4 +1,4 @@ -from .twitch.twitch import start_twitch_service +from .twitch.webhook import start_twitch_service start = start_twitch_service diff --git a/src/modules/stream_notifications/notification.py b/src/modules/stream_notifications/notification.py index 18da6e0..5645be2 100644 --- a/src/modules/stream_notifications/notification.py +++ b/src/modules/stream_notifications/notification.py @@ -6,7 +6,8 @@ from httpx import AsyncClient from core.config import config from domain.streamers import StreamerConfig -from .twitch.state import State +from .state import State +from .sent_notifications import SentNotificationType logger = logging.getLogger(__name__) @@ -48,14 +49,16 @@ def get_role_id(streamer_config: StreamerConfig, category: str) -> int | None: return roles.get(category) -async def notify(notification_type: Literal["start"] | Literal["change_category"], streamer_config: StreamerConfig, current_state: State): - if notification_type == "start": +async def notify(notification_type: SentNotificationType, streamer_config: StreamerConfig, current_state: State) -> dict[str, bool]: + result: dict[str, bool] = {} + + if notification_type == SentNotificationType.START_STREAM: message_template = streamer_config.notifications.start_stream else: message_template = streamer_config.notifications.change_category if message_template is None: - return + return result integrations = streamer_config.integrations @@ -69,7 +72,9 @@ async def notify(notification_type: Literal["start"] | Literal["change_category" try: await notify_telegram(msg, str(telegram.notifications_channel_id)) + result["telegram"] = True except Exception as e: + result["telegram"] = False logger.error("Failed to notify telegram", exc_info=e) if (discord := integrations.discord) is not None: @@ -90,5 +95,9 @@ async def notify(notification_type: Literal["start"] | Literal["change_category" try: await notify_discord(msg, str(discord.notifications_channel_id)) + result["discord"] = True except Exception as e: + result["discord"] = False logger.error("Failed to notify discord", exc_info=e) + + return result diff --git a/src/modules/stream_notifications/sent_notifications.py b/src/modules/stream_notifications/sent_notifications.py new file mode 100644 index 0000000..747cdaa --- /dev/null +++ b/src/modules/stream_notifications/sent_notifications.py @@ -0,0 +1,64 @@ +from enum import StrEnum +from datetime import datetime, timezone + +from pydantic import BaseModel + +from core.mongo import mongo_manager + +from .state import State + + +class SentNotificationType(StrEnum): + START_STREAM = "start_stream" + CHANGE_CATEGORY = "change_category" + + +class SentNotification(BaseModel): + notification_type: SentNotificationType + twitch_id: int + state: State + sent_result: dict[str, bool] + sent_at: datetime + + +class SentNotificationRepository: + COLLECTION_NAME = "sent_notifications" + + @classmethod + async def add( + cls, + twitch_id: int, + notification_type: SentNotificationType, + state: State, + sent_result: dict[str, bool], + ): + async with mongo_manager.connect() as client: + db = client.get_default_database() + collection = db[cls.COLLECTION_NAME] + + await collection.insert_one( + SentNotification( + notification_type=notification_type, + twitch_id=twitch_id, + state=state, + sent_at=datetime.now(timezone.utc), + sent_result=sent_result, + ).model_dump() + ) + + @classmethod + async def get_last_for_streamer( + cls, twitch_id: int + ) -> SentNotification | None: + async with mongo_manager.connect() as client: + db = client.get_default_database() + collection = db[cls.COLLECTION_NAME] + + doc = await collection.find_one( + {"twitch_id": twitch_id}, + sort={"sent_at": -1}, + ) + if doc is None: + return None + + return SentNotification(**doc) diff --git a/src/modules/stream_notifications/state.py b/src/modules/stream_notifications/state.py new file mode 100644 index 0000000..0cffbb7 --- /dev/null +++ b/src/modules/stream_notifications/state.py @@ -0,0 +1,40 @@ +from datetime import datetime + +from pydantic import BaseModel + +from core.mongo import mongo_manager + + +class State(BaseModel): + title: str + category: str + + last_live_at: datetime + + +class StateManager: + COLLECTION_NAME = "stream_twitch_state" + + @classmethod + async def get(cls, twitch_id: int) -> State | None: + async with mongo_manager.connect() as client: + db = client.get_default_database() + collection = db[cls.COLLECTION_NAME] + + state = await collection.find_one({"twitch_id": twitch_id}) + if state is None: + return None + + return State(**state) + + @classmethod + async def update(cls, twitch_id: int, state: State): + async with mongo_manager.connect() as client: + db = client.get_default_database() + collection = db[cls.COLLECTION_NAME] + + await collection.update_one( + {"twitch_id": twitch_id}, + {"$set": state.model_dump()}, + upsert=True + ) diff --git a/src/modules/stream_notifications/tasks.py b/src/modules/stream_notifications/tasks.py new file mode 100644 index 0000000..6912ec6 --- /dev/null +++ b/src/modules/stream_notifications/tasks.py @@ -0,0 +1,8 @@ +from core.broker import broker + +from .watcher import StateWatcher + + +@broker.task("stream_notifications.twitch.on_stream_state_change") +async def on_stream_state_change(streamer_id: int): + await StateWatcher.on_stream_state_change(streamer_id) diff --git a/src/modules/stream_notifications/twitch/authorize.py b/src/modules/stream_notifications/twitch/authorize.py new file mode 100644 index 0000000..277a42a --- /dev/null +++ b/src/modules/stream_notifications/twitch/authorize.py @@ -0,0 +1,34 @@ +from twitchAPI.twitch import Twitch +from twitchAPI.type import AuthScope + +from core.config import config + +from .token_storage import TokenStorage + + +SCOPES = [ + AuthScope.CHAT_READ, + AuthScope.CHAT_EDIT, +] + + +async def authorize(auto_refresh_auth: bool = False) -> Twitch: + twitch = Twitch( + config.TWITCH_CLIENT_ID, + config.TWITCH_CLIENT_SECRET + ) + + twitch.user_auth_refresh_callback = TokenStorage.save + twitch.auto_refresh_auth = auto_refresh_auth + + token, refresh_token = await TokenStorage.get() + await twitch.set_user_authentication( + token, + SCOPES, + refresh_token=refresh_token if auto_refresh_auth else None, + validate=True + ) + + await twitch.authenticate_app(SCOPES) + + return twitch diff --git a/src/modules/stream_notifications/twitch/state.py b/src/modules/stream_notifications/twitch/state.py deleted file mode 100644 index 4e5472e..0000000 --- a/src/modules/stream_notifications/twitch/state.py +++ /dev/null @@ -1,10 +0,0 @@ -from datetime import datetime - -from pydantic import BaseModel - - -class State(BaseModel): - title: str - category: str - - last_live_at: datetime diff --git a/src/modules/stream_notifications/twitch/twitch.py b/src/modules/stream_notifications/twitch/twitch.py deleted file mode 100644 index 4b3cee8..0000000 --- a/src/modules/stream_notifications/twitch/twitch.py +++ /dev/null @@ -1,200 +0,0 @@ -from asyncio import Lock, sleep -from datetime import datetime -import logging - -from twitchAPI.helper import first -from twitchAPI.eventsub.webhook import EventSubWebhook -from twitchAPI.twitch import Twitch -from twitchAPI.type import AuthScope -from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent - -from core.config import config -from modules.stream_notifications.notification import notify -from repositories.streamers import StreamerConfigRepository - -from .state import State -from .token_storage import TokenStorage - - -logger = logging.getLogger(__name__) - - -class TwitchService: - lock = Lock() - - SCOPES = [ - AuthScope.CHAT_READ, - AuthScope.CHAT_EDIT, - ] - - ONLINE_NOTIFICATION_DELAY = 15 * 60 - UPDATE_DELAY = 5 * 60 - - def __init__(self, twitch: Twitch): - self.twitch = twitch - - self.state: dict[int, State | None] = {} - - @classmethod - async def authorize(cls): - twitch = Twitch( - config.TWITCH_CLIENT_ID, - config.TWITCH_CLIENT_SECRET - ) - - twitch.user_auth_refresh_callback = TokenStorage.save - - token, refresh_token = await TokenStorage.get() - await twitch.set_user_authentication(token, cls.SCOPES, refresh_token) - - await twitch.authenticate_app(cls.SCOPES) - - return twitch - - async def notify_online(self, streamer_id: int): - current_state = self.state.get(streamer_id) - if current_state is None: - raise RuntimeError("State is None") - - streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id) - - if streamer.notifications.start_stream is None: - return - - await notify("start", streamer, current_state) - - async def notify_change_category(self, streamer_id: int): - current_state = self.state.get(streamer_id) - - if current_state is None: - raise RuntimeError("State is None") - - if (datetime.now() - current_state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY: - return - - streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id) - - if streamer.notifications.change_category is None: - return - - await notify("change_category", streamer, current_state) - - async def get_current_stream(self, streamer_id: int, retry_count: int = 5, delay: int = 5): - remain_retry = retry_count - - while remain_retry > 0: - stream = await first(self.twitch.get_streams(user_id=[str(streamer_id)])) - - if stream is not None: - return stream - - remain_retry -= 1 - await sleep(delay) - - return None - - async def on_channel_update(self, event: ChannelUpdateEvent): - brodcaster_id = int(event.event.broadcaster_user_id) - - stream = await self.get_current_stream(brodcaster_id) - if stream is None: - return - - async with self.lock: - current_state = self.state.get(brodcaster_id) - if current_state is None: - return - - changed = current_state.category != event.event.category_name - - current_state.title = event.event.title - current_state.category = event.event.category_name - current_state.last_live_at = datetime.now() - - self.state[brodcaster_id] = current_state - - if changed: - await self.notify_change_category(brodcaster_id) - - async def _on_stream_online(self, streamer_id: int): - current_stream = await self.get_current_stream(streamer_id) - if current_stream is None: - return - - state = State( - title=current_stream.title, - category=current_stream.game_name, - last_live_at=datetime.now() - ) - - async with self.lock: - current_state = self.state.get(streamer_id) - - is_need_notify = current_state is None or (datetime.now() - current_state.last_live_at).seconds >= self.ONLINE_NOTIFICATION_DELAY - - self.state[streamer_id] = state - - if is_need_notify: - await self.notify_online(streamer_id) - - async def on_stream_online(self, event: StreamOnlineEvent): - await self._on_stream_online(int(event.event.broadcaster_user_id)) - - async def run(self): - eventsub = EventSubWebhook( - callback_url=config.TWITCH_CALLBACK_URL, - port=config.TWITCH_CALLBACK_PORT, - twitch=self.twitch, - message_deduplication_history_length=50 - ) - - streamers = await StreamerConfigRepository.all() - - for streamer in streamers: - current_stream = await self.get_current_stream(streamer.twitch.id) - - if current_stream: - self.state[streamer.twitch.id] = State( - title=current_stream.title, - category=current_stream.game_name, - last_live_at=datetime.now() - ) - else: - self.state[streamer.twitch.id] = None - - try: - await eventsub.unsubscribe_all() - - eventsub.start() - - logger.info("Subscribe to events...") - - for streamer in streamers: - logger.info(f"Subscribe to events for {streamer.twitch.name}") - await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update) - await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online) - logger.info(f"Subscribe to events for {streamer.twitch.name} done") - - logger.info("Twitch service started") - - while True: - await sleep(self.UPDATE_DELAY) - - for streamer in streamers: - await self._on_stream_online(streamer.twitch.id) - finally: - await eventsub.stop() - await self.twitch.close() - - raise RuntimeError("Twitch service stopped") - - @classmethod - async def start(cls): - logger.info("Starting Twitch service...") - - twith = await cls.authorize() - await cls(twith).run() - - -async def start_twitch_service(): - await TwitchService.start() diff --git a/src/modules/stream_notifications/twitch/webhook.py b/src/modules/stream_notifications/twitch/webhook.py new file mode 100644 index 0000000..bf41b0d --- /dev/null +++ b/src/modules/stream_notifications/twitch/webhook.py @@ -0,0 +1,73 @@ +from asyncio import sleep +import logging +from typing import NoReturn + +from twitchAPI.eventsub.webhook import EventSubWebhook +from twitchAPI.twitch import Twitch +from twitchAPI.object.eventsub import StreamOnlineEvent, ChannelUpdateEvent + +from core.config import config +from repositories.streamers import StreamerConfigRepository +from modules.stream_notifications.tasks import on_stream_state_change + +from .authorize import authorize + + +logger = logging.getLogger(__name__) + + +class TwitchService: + ONLINE_NOTIFICATION_DELAY = 15 * 60 + + def __init__(self, twitch: Twitch): + self.twitch = twitch + + async def on_channel_update(self, event: ChannelUpdateEvent): + await on_stream_state_change.kiq(int(event.event.broadcaster_user_id)) + + async def on_stream_online(self, event: StreamOnlineEvent): + await on_stream_state_change.kiq(int(event.event.broadcaster_user_id)) + + async def run(self) -> NoReturn: + eventsub = EventSubWebhook( + callback_url=config.TWITCH_CALLBACK_URL, + port=config.TWITCH_CALLBACK_PORT, + twitch=self.twitch, + message_deduplication_history_length=50 + ) + + streamers = await StreamerConfigRepository.all() + + try: + await eventsub.unsubscribe_all() + + eventsub.start() + + logger.info("Subscribe to events...") + + for streamer in streamers: + logger.info(f"Subscribe to events for {streamer.twitch.name}") + await eventsub.listen_channel_update_v2(str(streamer.twitch.id), self.on_channel_update) + await eventsub.listen_stream_online(str(streamer.twitch.id), self.on_stream_online) + logger.info(f"Subscribe to events for {streamer.twitch.name} done") + + logger.info("Twitch service started") + + while True: + await sleep(0.1) + finally: + await eventsub.stop() + await self.twitch.close() + + raise RuntimeError("Twitch service stopped") + + @classmethod + async def start(cls): + logger.info("Starting Twitch service...") + + twith = await authorize(auto_refresh_auth=True) + await cls(twith).run() + + +async def start_twitch_service() -> NoReturn: + await TwitchService.start() diff --git a/src/modules/stream_notifications/watcher.py b/src/modules/stream_notifications/watcher.py new file mode 100644 index 0000000..f5e79c8 --- /dev/null +++ b/src/modules/stream_notifications/watcher.py @@ -0,0 +1,95 @@ +from datetime import datetime, timezone, timedelta + +from twitchAPI.helper import first + +from core.redis import redis_manager +from repositories.streamers import StreamerConfigRepository + +from .state import State, StateManager +from .sent_notifications import SentNotificationRepository, SentNotificationType +from .notification import notify +from .twitch.authorize import authorize + + +class StateWatcher: + START_STREAM_THRESHOLD = timedelta(minutes=15) + + @classmethod + async def get_twitch_state(cls, streamer_id: int) -> State | None: + twitch = await authorize() + + stream = await first( + twitch.get_streams(user_id=[str(streamer_id)]) + ) + + if stream is None: + return None + + return State( + title=stream.title, + category=stream.game_name, + last_live_at=datetime.now(timezone.utc) + ) + + @classmethod + async def notify_and_save( + cls, + streamer_id: int, + sent_notification_type: SentNotificationType, + state: State + ): + streamer = await StreamerConfigRepository.get_by_twitch_id(streamer_id) + + sent_result = await notify(sent_notification_type, streamer, state) + await SentNotificationRepository.add( + streamer.twitch.id, + sent_notification_type, + state, + sent_result=sent_result + ) + + @classmethod + async def notify_start_stream( + cls, + streamer_id: int, + state: State + ): + await cls.notify_and_save(streamer_id, SentNotificationType.START_STREAM, state) + + @classmethod + async def notify_change_category( + cls, + streamer_id: int, + state: State + ): + await cls.notify_and_save(streamer_id, SentNotificationType.CHANGE_CATEGORY, state) + + @classmethod + async def _on_stream_state_change(cls, streamer_id: int): + current_state = await cls.get_twitch_state(streamer_id) + if current_state is None: + return + + last_state = await StateManager.get(streamer_id) + if last_state is None: + await cls.notify_start_stream(streamer_id, current_state) + await StateManager.update(streamer_id, current_state) + return + + if datetime.now(timezone.utc) - last_state.last_live_at > cls.START_STREAM_THRESHOLD: + await cls.notify_start_stream(streamer_id, current_state) + await StateManager.update(streamer_id, current_state) + return + + if last_state.category != current_state.category: + await cls.notify_change_category(streamer_id, current_state) + await StateManager.update(streamer_id, current_state) + return + + await StateManager.update(streamer_id, current_state) + + @classmethod + async def on_stream_state_change(cls, streamer_id: int): + async with redis_manager.connect() as redis: + async with redis.lock(f"on_stream_state_change:{streamer_id}"): + await cls._on_stream_state_change(streamer_id) diff --git a/src/modules/tasks.py b/src/modules/tasks.py index af21fd0..1765381 100644 --- a/src/modules/tasks.py +++ b/src/modules/tasks.py @@ -1 +1,2 @@ from modules.scheduler_sync.tasks import * # noqa: F403 +from modules.stream_notifications.tasks import * # noqa: F403