diff --git a/l10n_it_fatturapa_govway/__init__.py b/l10n_it_fatturapa_govway/__init__.py new file mode 100644 index 000000000000..91c5580fed36 --- /dev/null +++ b/l10n_it_fatturapa_govway/__init__.py @@ -0,0 +1,2 @@ +from . import controllers +from . import models diff --git a/l10n_it_fatturapa_govway/__manifest__.py b/l10n_it_fatturapa_govway/__manifest__.py new file mode 100644 index 000000000000..ca2ff1cb6c2d --- /dev/null +++ b/l10n_it_fatturapa_govway/__manifest__.py @@ -0,0 +1,28 @@ +# Copyright 2024 Sergio Corato +# Copyright 2024 Marco Colombo +# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). +{ + "name": "ITA - Fattura elettronica - Supporto GovWay", + "version": "14.0.1.0.0", + "category": "Localization/Italy", + "summary": "Invio fatture elettroniche tramite GovWay", + "author": "Odoo Community Association (OCA)", + "website": "https://github.com/OCA/l10n-italy", + "license": "AGPL-3", + "depends": [ + "l10n_it_fatturapa_out", + "l10n_it_fatturapa_in", + "l10n_it_sdi_channel", + "web", + ], + "data": [ + # "security/ir.model.access.csv", + # "views/fetchmail_view.xml", + "views/sdi_view.xml", + # "views/sdi_view.xml", + # "views/ir_mail_server.xml", + # "data/fetchmail_data.xml", + # "data/sdi_channel_demo.xml", + ], + "installable": True, +} diff --git a/l10n_it_fatturapa_govway/controllers/__init__.py b/l10n_it_fatturapa_govway/controllers/__init__.py new file mode 100644 index 000000000000..12a7e529b674 --- /dev/null +++ b/l10n_it_fatturapa_govway/controllers/__init__.py @@ -0,0 +1 @@ +from . import main diff --git a/l10n_it_fatturapa_govway/controllers/main.py b/l10n_it_fatturapa_govway/controllers/main.py new file mode 100644 index 000000000000..188549dc22b3 --- /dev/null +++ b/l10n_it_fatturapa_govway/controllers/main.py @@ -0,0 +1,90 @@ +import logging + +from odoo.http import Controller, request, route + +_logger = logging.getLogger() + + +class FatturaPAGovWay(Controller): + # incoming invoices + @route(["/fatturapa/govway/ricevi_fattura"], type="http", auth="user", website=True) + def ricevi_fattura(self, *args, **post): + # headers + # - GovWay-SDI-FormatoArchivioBase64 + # - GovWay-SDI-FormatoArchivioInvioFattura + # - GovWay-SDI-FormatoFatturaPA + # - GovWay-SDI-IdentificativoSdI + # - GovWay-SDI-MessageId + # - GovWay-SDI-NomeFile + # - GovWay-SDI-NomeFileMetadati + # - GovWay-Transaction-ID + identificativo_sdi = request.httprequest.headers.get( + "GovWay-SDI-IdentificativoSdI", "" + ) + sdi_nomefile = request.httprequest.headers.get("GovWay-SDI-NomeFile", "") + transaction_id = request.httprequest.headers.get("GovWay-Transaction-ID", "") + + sdi_formatoarchiviobase64 = request.httprequest.headers.get( + "GovWay-SDI-FormatoArchivioBase64", "" + ) + sdi_formatoarchiviiinviofattura = request.httprequest.headers.get( + "GovWay-SDI-FormatoArchivioInvioFattura", "" + ) + sdi_formartofatturapa = request.httprequest.headers.get( + "GovWay-SDI-FormatoFatturaPA", "" + ) + sdi_messageid = request.httprequest.headers.get("GovWay-SDI-MessageId", "") + sdi_nomefile_metadati = request.httprequest.headers.get( + "GovWay-SDI-NomeFileMetadati", "" + ) + + _logger.info( + f"ricevi_fattura(): {identificativo_sdi} {sdi_nomefile} {transaction_id}" + ) + _logger.debug(f"ricevi_fattura(): args={repr(args)}") + _logger.debug(f"ricevi_fattura(): post={repr(post)}") + + @route(["/fatturapa/govway/ricevi_ndt"], type="http", auth="user", website=True) + def ricevi_ndt(self, *args, **post): + # headers + # - GovWay-SDI-IdentificativoSdI + # - GovWay-SDI-NomeFile + # - GovWay-Transaction-ID + identificativo_sdi = request.httprequest.headers.get( + "GovWay-SDI-IdentificativoSdI", "" + ) + sdi_nomefile = request.httprequest.headers.get("GovWay-SDI-NomeFile", "") + transaction_id = request.httprequest.headers.get("GovWay-Transaction-ID", "") + + _logger.info( + f"ricevi_ndt(): {identificativo_sdi} {sdi_nomefile} {transaction_id}" + ) + _logger.debug(f"ricevi_ndt(): args={repr(args)}") + _logger.debug(f"ricevi_ndt(): post={repr(post)}") + + # outgoing invoices + @route( + ["/fatturapa/govway/ricevi_notifica"], + type="http", + auth="user", + methods=["POST"], + website=True, + ) + def ricevi_notifica(self, *args, **post): + # headers: + # - GovWay-SDI-IdentificativoSdI + # - GovWay-SDI-NomeFile + # - GovWay-Transaction-ID + identificativo_sdi = request.httprequest.headers.get( + "GovWay-SDI-IdentificativoSdI", "" + ) + sdi_nomefile = request.httprequest.headers.get("GovWay-SDI-NomeFile", "") + transaction_id = request.httprequest.headers.get("GovWay-Transaction-ID", "") + + _logger.info( + f"ricevi_notifica(): {identificativo_sdi} {sdi_nomefile} {transaction_id}" + ) + _logger.debug(f"ricevi_notifica(): args={repr(args)}") + _logger.debug(f"ricevi_notifica(): post={repr(post)}") + # request.env["sdi.channel"].sdi_channel_model.receive_notification( + # { sdi_nomefile: post }) diff --git a/l10n_it_fatturapa_govway/data/sdi_channel_demo.xml b/l10n_it_fatturapa_govway/data/sdi_channel_demo.xml new file mode 100644 index 000000000000..65b8e2547e60 --- /dev/null +++ b/l10n_it_fatturapa_govway/data/sdi_channel_demo.xml @@ -0,0 +1,7 @@ + + + + GovWay + govway + + diff --git a/l10n_it_fatturapa_govway/models/__init__.py b/l10n_it_fatturapa_govway/models/__init__.py new file mode 100644 index 000000000000..530ecd9cfcf8 --- /dev/null +++ b/l10n_it_fatturapa_govway/models/__init__.py @@ -0,0 +1,5 @@ +# from . import fatturapa_attachment_out +# from . import mail_thread +# from . import ir_mail_server +# from . import fetchmail +from . import sdi diff --git a/l10n_it_fatturapa_govway/models/fatturapa_attachment_out.py b/l10n_it_fatturapa_govway/models/fatturapa_attachment_out.py new file mode 100644 index 000000000000..00d362a83f3b --- /dev/null +++ b/l10n_it_fatturapa_govway/models/fatturapa_attachment_out.py @@ -0,0 +1,147 @@ +# Author(s): Andrea Colangelo (andreacolangelo@openforce.it) +# Copyright 2018 Openforce Srls Unipersonale (www.openforce.it) +# Copyright 2018 Sergio Corato (https://efatto.it) +# Copyright 2018-2019 Lorenzo Battistini + +import logging +import re + +from odoo import _, fields, models +from odoo.exceptions import UserError + +_logger = logging.getLogger(__name__) + +RESPONSE_MAIL_REGEX = ( + "[A-Z]{2}[a-zA-Z0-9]{11,16}_[a-zA-Z0-9]{,5}_[A-Z]{2}_" "[a-zA-Z0-9]{,3}" +) + + +class FatturaPAAttachmentOut(models.Model): + _inherit = "fatturapa.attachment.out" + + def _message_type_ns( + self, root, id_sdi, message_id, receipt_dt, fatturapa_attachment_out + ): + error_list = root.find("ListaErrori") + error_str = "" + for error in error_list: + error_str += "\n[%s] %s %s" % ( + error.find("Codice").text if error.find("Codice") is not None else "", + error.find("Descrizione").text + if error.find("Descrizione") is not None + else "", + error.find("Suggerimento").text + if error.find("Suggerimento") is not None + else "", + ) + fatturapa_attachment_out.write( + { + "state": "sender_error", + "last_sdi_response": f"SdI ID: {id_sdi}; " + f"Message ID: {message_id}; Receipt date: {receipt_dt}; " + f"Error: {error_str}", + } + ) + + def _message_type_mc( + self, root, id_sdi, message_id, receipt_dt, fatturapa_attachment_out + ): + missed_delivery_note = root.find("Descrizione").text + fatturapa_attachment_out.write( + { + "state": "recipient_error", + "last_sdi_response": f"SdI ID: {id_sdi}; " + f"Message ID: {message_id}; Receipt date: {receipt_dt}; " + f"Missed delivery note: {missed_delivery_note}", + } + ) + + def _message_type_rc( + self, root, id_sdi, message_id, receipt_dt, fatturapa_attachment_out + ): + delivery_dt = root.find("DataOraConsegna").text + fatturapa_attachment_out.write( + { + "state": "validated", + "delivered_date": fields.Datetime.now(), + "last_sdi_response": f"SdI ID: {id_sdi}; " + f"Message ID: {message_id}; Receipt date: {receipt_dt}; " + f"Delivery date: {delivery_dt}", + } + ) + + def _message_type_ne(self, root, id_sdi, message_id, fatturapa_attachment_out): + esito_committente = root.find("EsitoCommittente") + if esito_committente is not None: + # more than one esito? + esito = esito_committente.find("Esito") + state = "" + if esito is not None: + if esito.text == "EC01": + state = "accepted" + elif esito.text == "EC02": + state = "rejected" + fatturapa_attachment_out.write( + { + "state": state, + "last_sdi_response": f"SdI ID: {id_sdi}; " + f"Message ID: {message_id}; Response: {esito.text}; ", + } + ) + + def _message_type_dt( + self, root, id_sdi, message_id, receipt_dt, fatturapa_attachment_out + ): + description = root.find("Descrizione") + if description is not None: + fatturapa_attachment_out.write( + { + "state": "validated", + "last_sdi_response": f"SdI ID: {id_sdi}; " + f"Message ID: {message_id}; Receipt date: {receipt_dt}; " + f"Description: {description.text}", + } + ) + + def _message_type_at( + self, root, id_sdi, message_id, receipt_dt, fatturapa_attachment_out + ): + description = root.find("Descrizione") + if description is not None: + fatturapa_attachment_out.write( + { + "state": "accepted", + "last_sdi_response": ( + "SdI ID: {}; Message ID: {}; Receipt date: {};" + " Description: {}" + ).format(id_sdi, message_id, receipt_dt, description.text), + } + ) + + def parse_pec_response(self, message_dict): + message_dict["model"] = self._name + message_dict["res_id"] = 0 + + regex = re.compile(RESPONSE_MAIL_REGEX) + notifications = [x for x in message_dict["attachments"] if regex.match(x.fname)] + + if not notifications: + raise UserError( + _( + 'PEC message "%s" is coming from SDI but attachments do not ' + "match SDI response format. Please check." + ) + % (message_dict["subject"]) + ) + + sdi_channel_model = self.env["sdi.channel"] + attachments = sdi_channel_model.receive_notification( + { + notification.fname: notification.content + for notification in notifications + }, + ) + + # Link the message to the last attachment updated + message_dict["res_id"] = attachments[-1].id + return message_dict diff --git a/l10n_it_fatturapa_govway/models/fetchmail.py b/l10n_it_fatturapa_govway/models/fetchmail.py new file mode 100644 index 000000000000..9aa812658e61 --- /dev/null +++ b/l10n_it_fatturapa_govway/models/fetchmail.py @@ -0,0 +1,198 @@ +# Copyright 2018 Lorenzo Battistini + +import logging + +from odoo import _, fields, models + +_logger = logging.getLogger(__name__) +MAX_POP_MESSAGES = 50 + + +class Fetchmail(models.Model): + _inherit = "fetchmail.server" + + is_fatturapa_pec = fields.Boolean("E-invoice PEC server") + + def _default_e_inv_notify_partner_ids(self): + return [(6, 0, [self.env.user.partner_id.id])] + + last_pec_error_message = fields.Text("Last PEC Error Message", readonly=True) + pec_error_count = fields.Integer("PEC error count", readonly=True) + e_inv_notify_partner_ids = fields.Many2many( + "res.partner", + string="Contacts to notify", + help="Contacts to notify when PEC message can't be processed", + domain=[("email", "!=", False)], + default=_default_e_inv_notify_partner_ids, + ) + + def fetch_mail_server_type_imap( + self, server, MailThread, error_messages, **additional_context + ): + imap_server = None + try: + imap_server = server.connect() + imap_server.select() + result, data = imap_server.search(None, "(UNSEEN)") + for num in data[0].split(): + result, data = imap_server.fetch(num, "(RFC822)") + imap_server.store(num, "-FLAGS", "\\Seen") + try: + MailThread.with_context(**additional_context).message_process( + server.object_id.model, + data[0][1], + save_original=server.original, + strip_attachments=(not server.attach), + ) + # if message is processed without exceptions + server.last_pec_error_message = "" + except Exception as e: + server.manage_pec_failure(e, error_messages) + continue + imap_server.store(num, "+FLAGS", "\\Seen") + # We need to commit because message is processed: + # Possible next exceptions, out of try, should not + # rollback processed messages + self._cr.commit() # pylint: disable=invalid-commit + except Exception as e: + server.manage_pec_failure(e, error_messages) + finally: + if imap_server: + imap_server.close() + imap_server.logout() + + def fetch_mail_server_type_pop( + self, server, MailThread, error_messages, **additional_context + ): + pop_server = None + try: + while True: + pop_server = server.connect() + (num_messages, total_size) = pop_server.stat() + pop_server.list() + for num in range(1, min(MAX_POP_MESSAGES, num_messages) + 1): + (header, messages, octets) = pop_server.retr(num) + message = "\n".join(messages) + try: + MailThread.with_context(**additional_context).message_process( + server.object_id.model, + message, + save_original=server.original, + strip_attachments=(not server.attach), + ) + pop_server.dele(num) + # See the comments in the IMAP part + server.last_pec_error_message = "" + except Exception as e: + server.manage_pec_failure(e, error_messages) + continue + # pylint: disable=invalid-commit + self._cr.commit() + if num_messages < MAX_POP_MESSAGES: + break + pop_server.quit() + except Exception as e: + server.manage_pec_failure(e, error_messages) + finally: + if pop_server: + pop_server.quit() + + def fetch_mail(self): + for server in self: + if not server.is_fatturapa_pec: + super(Fetchmail, server).fetch_mail() + else: + additional_context = {"fetchmail_cron_running": True} + # Setting fetchmail_cron_running to avoid to disable cron while + # cron is running (otherwise it would be done by setting + # server.state = 'draft', + # see _update_cron method) + server = server.with_context(**additional_context) + MailThread = self.env["mail.thread"] + _logger.info( + "start checking for new e-invoices on %s server %s", + server.server_type, + server.name, + ) + additional_context["fetchmail_server_id"] = server.id + additional_context["server_type"] = server.server_type + error_messages = list() + if server.server_type == "imap": + server.fetch_mail_server_type_imap( + server, MailThread, error_messages, **additional_context + ) + elif server.server_type == "pop": + server.fetch_mail_server_type_pop( + server, MailThread, error_messages, **additional_context + ) + if error_messages: + server.notify_or_log(error_messages) + server.pec_error_count += 1 + max_retry = self.env["ir.config_parameter"].get_param( + "fetchmail.pec.max.retry" + ) + if server.pec_error_count > int(max_retry): + # Setting to draft prevents new e-invoices to + # be sent via PEC. + # Resetting server state only after N fails. + # So that the system can try to fetch again after + # temporary connection errors + server.state = "draft" + server.notify_about_server_reset() + else: + server.pec_error_count = 0 + server.write({"date": fields.Datetime.now()}) + return True + + def manage_pec_failure(self, exception, error_messages): + self.ensure_one() + _logger.info( + "Failure when fetching emails " + f"using {self.server_type} server {self.name}.", + exc_info=True, + ) + + exception_msg = str(exception) + # `str` on Odoo exceptions does not return + # a nice representation of the error + odoo_exc_string = getattr(exception, "name", None) + if odoo_exc_string: + exception_msg = odoo_exc_string + + self.last_pec_error_message = exception_msg + error_messages.append(exception_msg) + return True + + def notify_about_server_reset(self): + self.ensure_one() + self.notify_or_log( + _("PEC server %s has been reset. Last error message is '%s'") + % (self.name, self.last_pec_error_message) + ) + + def notify_or_log(self, message): + """ + Send an email to partners in + self.e_inv_notify_partner_ids containing message. + + :param: message + :type message: list of str, or str + """ + self.ensure_one() + if isinstance(message, list): + message = "
".join(message) + + if self.e_inv_notify_partner_ids: + self.env["mail.mail"].create( + { + "subject": _("Fetchmail PEC server [%s] error") % self.name, + "body_html": message, + "recipient_ids": [(6, 0, self.e_inv_notify_partner_ids.ids)], + } + ).send() + _logger.info( + "Notifying partners %s about PEC server %s error" + % (self.e_inv_notify_partner_ids.ids, self.name) + ) + else: + _logger.error("Can't notify anyone about PEC server %s error" % self.name) diff --git a/l10n_it_fatturapa_govway/models/ir_mail_server.py b/l10n_it_fatturapa_govway/models/ir_mail_server.py new file mode 100644 index 000000000000..e1261d4a1c48 --- /dev/null +++ b/l10n_it_fatturapa_govway/models/ir_mail_server.py @@ -0,0 +1,41 @@ +# # Copyright 2018 Sergio Corato (https://efatto.it) +# # Copyright 2018 Lorenzo Battistini +# # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +# +# from odoo import api, fields, models +# +# +# class IrMailServer(models.Model): +# _inherit = "ir.mail_server" +# +# is_fatturapa_pec = fields.Boolean("E-invoice PEC server") +# email_from_for_fatturaPA = fields.Char("Sender Email Address") +# +# def test_smtp_connection(self): +# for server in self: +# if server.is_fatturapa_pec: +# # self.env.user.email is used to test SMTP connection +# server.env.user.email = server.email_from_for_fatturaPA +# # no need to revert to correct email: UserError is always raised and +# # rollback done +# return super(IrMailServer, self).test_smtp_connection() +# +# @api.model +# def _search( +# self, +# args, +# offset=0, +# limit=None, +# order=None, +# count=False, +# access_rights_uid=None, +# ): +# if args == [] and order == "sequence" and limit == 1: +# # This happens in ir.mail_server.connect method when no SMTP server is +# # explicitly set. +# # In this case (sending normal emails without expliciting SMTP server) +# # the e-invoice PEC server must not be used +# args = [("is_fatturapa_pec", "=", False)] +# return super(IrMailServer, self)._search( +# args, offset, limit, order, count, access_rights_uid +# ) diff --git a/l10n_it_fatturapa_govway/models/mail_thread.py b/l10n_it_fatturapa_govway/models/mail_thread.py new file mode 100644 index 000000000000..d9159f3be9df --- /dev/null +++ b/l10n_it_fatturapa_govway/models/mail_thread.py @@ -0,0 +1,243 @@ +# Author(s): Andrea Colangelo (andreacolangelo@openforce.it) +# Copyright 2018 Openforce Srls Unipersonale (www.openforce.it) +# Copyright 2018 Sergio Corato (https://efatto.it) +# Copyright 2018 Lorenzo Battistini + +import base64 +import logging +import re + +from odoo import _, api, models +from odoo.exceptions import UserError + +_logger = logging.getLogger(__name__) + +FATTURAPA_IN_REGEX = ( + "^(IT[a-zA-Z0-9]{11,16}|" + "(?!IT)[A-Z]{2}[a-zA-Z0-9]{2,28})" + "_[a-zA-Z0-9]{1,5}" + "\\.(xml|XML|Xml|zip|ZIP|Zip|p7m|P7M|P7m)" + "(\\.(p7m|P7M|P7m))?$" +) +RESPONSE_MAIL_REGEX = ( + "(IT[a-zA-Z0-9]{11,16}|" + "(?!IT)[A-Z]{2}[a-zA-Z0-9]{2,28})" + "_[a-zA-Z0-9]{1,5}" + "_MT_[a-zA-Z0-9]{,3}" +) + +fatturapa_regex = re.compile(FATTURAPA_IN_REGEX) +response_regex = re.compile(RESPONSE_MAIL_REGEX) + + +class MailThread(models.AbstractModel): + _inherit = "mail.thread" + + def clean_message_dict(self, message_dict): + del message_dict["attachments"] + del message_dict["cc"] + del message_dict["from"] + del message_dict["to"] + del message_dict["recipients"] + del message_dict["references"] + del message_dict["in_reply_to"] + del message_dict["bounced_email"] + del message_dict["bounced_partner"] + del message_dict["bounced_msg_id"] + del message_dict["bounced_message"] + + @api.model + def message_route( + self, message, message_dict, model=None, thread_id=None, custom_values=None + ): + if any( + "@pec.fatturapa.it" in x + for x in [ + message.get("Reply-To", ""), + message.get("From", ""), + message.get("Return-Path", ""), + ] + ): + _logger.info( + "Processing FatturaPA PEC with Message-Id: " "{}".format( + message.get("Message-Id") + ) + ) + fatturapa_attachments = [ + x for x in message_dict["attachments"] if fatturapa_regex.match(x.fname) + ] + response_attachments = [ + x for x in message_dict["attachments"] if response_regex.match(x.fname) + ] + if response_attachments and fatturapa_attachments: + return self.manage_pec_fe_attachments( + message, message_dict, response_attachments + ) + else: + return self.manage_pec_sdi_notification(message, message_dict) + + elif self._context.get("fetchmail_server_id", False): + # This is not an email coming from SDI + fetchmail_server = self.env["fetchmail.server"].browse( + self._context["fetchmail_server_id"] + ) + if fetchmail_server.is_fatturapa_pec: + att = self.find_attachment_by_subject(message_dict["subject"]) + if att: + return self.manage_pec_sdi_response(att, message_dict) + raise UserError( + _( + 'PEC message "%s" has been read ' + "but not processed, as not related to an " + "e-invoice.\n" + "Please check PEC mailbox %s, at server %s," + " with user %s." + ) + % ( + message_dict["subject"], + fetchmail_server.name, + fetchmail_server.server, + fetchmail_server.user, + ) + ) + return super(MailThread, self).message_route( + message, + message_dict, + model=model, + thread_id=thread_id, + custom_values=custom_values, + ) + + def manage_pec_sdi_response(self, att, message_dict): + # This is a PEC response (CONSEGNA o ACCETTAZIONE) + # related to a message sent to SDI by us + message_dict["model"] = "fatturapa.attachment.out" + message_dict["res_id"] = att.id + self.clean_message_dict(message_dict) + self.env["mail.message"].with_context( + message_create_from_mail_mail=True + ).create(message_dict) + return [] + + def manage_pec_sdi_notification(self, message, message_dict): + # this is an SDI notification + message_dict = self.env["fatturapa.attachment.out"].parse_pec_response( + message_dict + ) + message_dict["record_name"] = message_dict["subject"] + attachment_ids = self._message_post_process_attachments( + message_dict["attachments"], [], message_dict + ).get("attachment_ids") + message_dict["attachment_ids"] = attachment_ids + self.clean_message_dict(message_dict) + # message_create_from_mail_mail to avoid to notify message + # (see mail.message.create) + self.env["mail.message"].with_context( + message_create_from_mail_mail=True + ).create(message_dict) + _logger.info( + "Routing FatturaPA PEC E-Mail with Message-Id: {}".format( + message.get("Message-Id") + ) + ) + return [] + + def manage_pec_fe_attachments(self, message, message_dict, response_attachments): + # this is an electronic invoice + if len(response_attachments) > 1: + _logger.info("More than 1 message found in mail of incoming invoice") + message_dict["model"] = "fatturapa.attachment.in" + message_dict["record_name"] = message_dict["subject"] + message_dict["res_id"] = 0 + attachment_ids = self._message_post_process_attachments( + message_dict["attachments"], [], message_dict + ).get("attachment_ids") + for attachment in self.env["ir.attachment"].browse( + [att_id for m, att_id in attachment_ids] + ): + if fatturapa_regex.match(attachment.name): + self.create_fatturapa_attachment_in(attachment, message_dict) + message_dict["attachment_ids"] = attachment_ids + self.clean_message_dict(message_dict) + # model and res_id are only needed by + # _message_post_process_attachments: we don't attach to + del message_dict["model"] + del message_dict["res_id"] + # message_create_from_mail_mail to avoid to notify message + # (see mail.message.create) + self.env["mail.message"].with_context( + message_create_from_mail_mail=True + ).create(message_dict) + _logger.info( + "Routing FatturaPA PEC E-Mail with Message-Id: {}".format( + message.get("Message-Id") + ) + ) + return [] + + def find_attachment_by_subject(self, subject): + attachment_out_model = self.env["fatturapa.attachment.out"] + if "CONSEGNA: " in subject: + att_name = subject.replace("CONSEGNA: ", "") + fatturapa_attachment_out = attachment_out_model.search( + [("name", "=", att_name)] + ) + if not fatturapa_attachment_out: + fatturapa_attachment_out = attachment_out_model.search( + [("name", "=", att_name)] + ) + if len(fatturapa_attachment_out) == 1: + return fatturapa_attachment_out + if "ACCETTAZIONE: " in subject: + att_name = subject.replace("ACCETTAZIONE: ", "") + fatturapa_attachment_out = attachment_out_model.search( + [("name", "=", att_name)] + ) + if not fatturapa_attachment_out: + fatturapa_attachment_out = attachment_out_model.search( + [("name", "=", att_name)] + ) + if len(fatturapa_attachment_out) == 1: + return fatturapa_attachment_out + return attachment_out_model.browse() + + def create_fatturapa_attachment_in(self, attachment, message_dict=None): + fetchmail_server_id = self.env.context.get("fetchmail_server_id") + received_date = False + if message_dict is not None and "date" in message_dict: + received_date = message_dict["date"] + company_id = False + # The incoming supplier e-bill doesn't carry which company + # we must use to create the given fatturapa.attachment.in record, + # so we expect fetchmail_server_id coming in the context + # see fetchmail.py. + # With this information we search which SDI is actually using it, + # finally the SDI contain both company and user we would need to use + sdi_channel_model = self.env["sdi.channel"] + if fetchmail_server_id: + sdi_chan = sdi_channel_model.search( + [("fetch_pec_server_id", "=", fetchmail_server_id)], limit=1 + ) + if sdi_chan: + # See check_fetch_pec_server_id + company_id = sdi_chan.company_id.id + file_name_content_dict = { + attachment.name: base64.b64decode(attachment.datas), + } + default_values = { + "company_id": company_id, + "e_invoice_received_date": received_date, + } + attachments = sdi_channel_model.receive_fe( + file_name_content_dict, + dict(), # Not managing metadata files for now + **default_values, + ) + + # Notify if there was an error + # during automatic import of invoices from PEC. + for attachment in attachments: + parsing_error = attachment.e_invoice_parsing_error + if parsing_error: + raise Exception(parsing_error) + return attachments diff --git a/l10n_it_fatturapa_govway/models/sdi.py b/l10n_it_fatturapa_govway/models/sdi.py new file mode 100644 index 000000000000..94963ff46244 --- /dev/null +++ b/l10n_it_fatturapa_govway/models/sdi.py @@ -0,0 +1,164 @@ +import base64 +from urllib.parse import urljoin + +import requests + +from odoo import _, fields, models +from odoo.exceptions import UserError + + +class SdiChannel(models.Model): + _inherit = "sdi.channel" + + channel_type = fields.Selection( + selection_add=[("govway", "GovWay")], ondelete={"govway": "cascade"} + ) + govway_url = fields.Char( + string="GovWay URL", + ) + govway_user = fields.Char(string="GovWay User") + govway_password = fields.Char(string="GovWay Password") + + # @api.constrains("fetch_pec_server_id") + # def check_fetch_pec_server_id(self): + # pec_channels = self.filtered(lambda c: c.channel_type == "pec") + # for channel in pec_channels: + # domain = [ + # ("fetch_pec_server_id", "=", channel.fetch_pec_server_id.id), + # ("id", "in", pec_channels.ids), + # ] + # elements = self.search(domain) + # if len(elements) > 1: + # raise exceptions.ValidationError( + # _("The channel %s with pec server %s already exists") + # % (channel.name, channel.fetch_pec_server_id.name) + # ) + # + # @api.constrains("pec_server_id") + # def check_pec_server_id(self): + # pec_channels = self.filtered(lambda c: c.channel_type == "pec") + # for channel in pec_channels: + # domain = [ + # ("pec_server_id", "=", channel.pec_server_id.id), + # ("id", "in", pec_channels.ids), + # ] + # elements = self.search(domain) + # if len(elements) > 1: + # raise exceptions.ValidationError( + # _("The channel %s with pec server %s already exists") + # % (channel.name, channel.pec_server_id.name) + # ) + # + # @api.constrains("email_exchange_system") + # def check_email_validity(self): + # if self.env.context.get("skip_check_email_validity"): + # return + # pec_channels = self.filtered(lambda c: c.channel_type == "pec") + # for channel in pec_channels: + # if not extract_rfc2822_addresses(channel.email_exchange_system): + # raise exceptions.ValidationError( + # _("Email %s is not valid") % channel.email_exchange_system + # ) + # + # def check_first_pec_sending(self): + # if not self.first_invoice_sent: + # sdi_address = self.env["ir.config_parameter"].get_param( + # "sdi.pec.first.address", + # ) + # self.email_exchange_system = sdi_address + # else: + # if not self.email_exchange_system: + # raise exceptions.UserError( + # _( + # "SDI PEC address not set. Please update it with the " + # "address indicated by SDI after the first sending" + # ) + # ) + # + # def update_after_first_pec_sending(self): + # if not self.first_invoice_sent: + # self.first_invoice_sent = True + # self.with_context( + # skip_check_email_validity=True + # ).email_exchange_system = False + + def send_via_govway(self, attachment_out_ids): + if not self.govway_url: + raise UserError(_("Missing GovWay URL")) + for att in attachment_out_ids: + if not att.datas or not att.name: + raise UserError(_("File content and file name are mandatory")) + company = att.company_id + vat = company.vat.split("IT")[1] + url = ( + "govway/sdi/out/xml2soap/Pretecno" + "/CentroServiziFatturaPA/SdIRiceviFile/v1" + ) + url = urljoin(self.govway_url, url) + params = { + "Versione": "FPR12", # VersioneFatturaPA + "TipoFile": "XML", # P7M: application/pkcs7-mime + "IdPaese": "IT", + "IdCodice": vat, + } + try: + response = requests.post( + url=url, + data=base64.b64decode(att.datas), + params=params, + timeout=60, + auth=(self.govway_user, self.govway_password), + headers={"Content-Type": "application/octet-stream"}, + ) + if not response.ok: + # response_data = json.loads(response.text) + # status = response_data.get("status") + # if status and status.get("error_code", False): + raise Exception( + _( + "Failed to fetch from CoinMarketCap with error code: " + "%s and error message: %s" + ) + % (response.status_code, response.text) + ) + except Exception as e: + raise UserError( + _("GovWay server not available for %s. Please configure it.") + % str(e) + ) + return {"result": "ok"} # response_data.get("data", {}) + mail_message = self.env["mail.message"].create( + { + "model": att._name, + "res_id": att.id, + "subject": att.name, + "body": "XML file for FatturaPA {} sent to Exchange System to " + "the email address {}.".format( + att.name, company.email_exchange_system + ), + "attachment_ids": [(6, 0, att.ir_attachment_id.ids)], + "email_from": company.email_from_for_fatturaPA, + "reply_to": company.email_from_for_fatturaPA, + "mail_server_id": company.sdi_channel_id.pec_server_id.id, + } + ) + + mail = self.env["mail.mail"].create( + { + "mail_message_id": mail_message.id, + "body_html": mail_message.body, + "email_to": company.email_exchange_system, + "headers": {"Return-Path": company.email_from_for_fatturaPA}, + } + ) + + if mail: + try: + mail.send(raise_exception=True) + att.state = "sent" + att.sending_date = fields.Datetime.now() + att.sending_user = user.id + company.sdi_channel_id.update_after_first_pec_sending() + except MailDeliveryException as e: + att.state = "sender_error" + mail.body = str(e) diff --git a/l10n_it_fatturapa_govway/pyproject.toml b/l10n_it_fatturapa_govway/pyproject.toml new file mode 100644 index 000000000000..4231d0cccb3d --- /dev/null +++ b/l10n_it_fatturapa_govway/pyproject.toml @@ -0,0 +1,3 @@ +[build-system] +requires = ["whool"] +build-backend = "whool.buildapi" diff --git a/l10n_it_fatturapa_govway/static/description/icon.png b/l10n_it_fatturapa_govway/static/description/icon.png new file mode 100644 index 000000000000..1f07394c3b79 Binary files /dev/null and b/l10n_it_fatturapa_govway/static/description/icon.png differ diff --git a/l10n_it_fatturapa_govway/views/sdi_view.xml b/l10n_it_fatturapa_govway/views/sdi_view.xml new file mode 100644 index 000000000000..92cbbb9fe901 --- /dev/null +++ b/l10n_it_fatturapa_govway/views/sdi_view.xml @@ -0,0 +1,18 @@ + + + + + sdi.channel.inherit.view.form + sdi.channel + + + + + + + + + + + + diff --git a/l10n_it_fatturapa_out/tests/data/IT06363391001_00006.xml b/l10n_it_fatturapa_out/tests/data/IT04366690248_00006.xml similarity index 98% rename from l10n_it_fatturapa_out/tests/data/IT06363391001_00006.xml rename to l10n_it_fatturapa_out/tests/data/IT04366690248_00006.xml index ff44b7d9a2dc..668a8674fab0 100644 --- a/l10n_it_fatturapa_out/tests/data/IT06363391001_00006.xml +++ b/l10n_it_fatturapa_out/tests/data/IT04366690248_00006.xml @@ -4,7 +4,7 @@ IT - 06363391001 + 04366690248 00006 FPR12