diff --git a/app/Console/Commands/ExternalScheduleFeedIngestionCommand.php b/app/Console/Commands/ExternalScheduleFeedIngestionCommand.php index 01ac628f..d4f5a886 100644 --- a/app/Console/Commands/ExternalScheduleFeedIngestionCommand.php +++ b/app/Console/Commands/ExternalScheduleFeedIngestionCommand.php @@ -59,6 +59,7 @@ final class ExternalScheduleFeedIngestionCommand extends Command { public function handle() { + $this->info("starting summits external ingestion"); $start = time(); $this->service->ingestAllSummits(); $end = time(); diff --git a/app/Models/Foundation/Summit/Repositories/ISummitEventRepository.php b/app/Models/Foundation/Summit/Repositories/ISummitEventRepository.php index be2387da..5162ef4c 100644 --- a/app/Models/Foundation/Summit/Repositories/ISummitEventRepository.php +++ b/app/Models/Foundation/Summit/Repositories/ISummitEventRepository.php @@ -50,4 +50,11 @@ interface ISummitEventRepository extends IBaseRepository * @param int $event_id */ public function cleanupScheduleAndFavoritesForEvent($event_id); + + /** + * @param Summit $summit + * @param array $external_ids + * @return mixed + */ + public function getPublishedEventsBySummitNotInExternalIds(Summit $summit, array $external_ids); } \ No newline at end of file diff --git a/app/Models/OAuth2/ResourceServerContext.php b/app/Models/OAuth2/ResourceServerContext.php index de8c1e64..ea517fdb 100644 --- a/app/Models/OAuth2/ResourceServerContext.php +++ b/app/Models/OAuth2/ResourceServerContext.php @@ -11,7 +11,6 @@ * See the License for the specific language governing permissions and * limitations under the License. **/ - use Illuminate\Support\Facades\Log; use libs\utils\ITransactionService; use models\main\IMemberRepository; @@ -126,31 +125,25 @@ final class ResourceServerContext implements IResourceServerContext public function getCurrentUser(): ?Member { return $this->tx_service->transaction(function() { - Log::debug("ResourceServerContext::getCurrentUser"); $member = null; // legacy test, for new IDP version this value came on null $id = $this->getCurrentUserExternalId(); if(!is_null($id)){ - Log::debug(sprintf("ResourceServerContext::getCurrentUser: getCurrentUserExternalId is %s", $id)); $member = $this->member_repository->getById(intval($id)); if(!is_null($member)) return $member; } // is null if(is_null($member)){ - Log::debug("ResourceServerContext::getCurrentUser: getCurrentUserExternalId is null"); // try to get by external id $id = $this->getCurrentUserId(); if(is_null($id)) { - Log::debug("ResourceServerContext::getCurrentUser: getCurrentUserId is null"); return null; } - Log::debug(sprintf("ResourceServerContext::getCurrentUser: getCurrentUserId is %s", $id)); $member = $this->member_repository->getByExternalId(intval($id)); } if(is_null($member)){ - Log::debug("ResourceServerContext::getCurrentUser: member is null"); // we assume that is new idp version and claims alreaady exists on context $user_external_id = $this->getAuthContextVar('user_id'); $user_first_name = $this->getAuthContextVar('user_first_name'); diff --git a/app/Repositories/Summit/DoctrineSpeakerRepository.php b/app/Repositories/Summit/DoctrineSpeakerRepository.php index 890f770e..2bf9a75a 100644 --- a/app/Repositories/Summit/DoctrineSpeakerRepository.php +++ b/app/Repositories/Summit/DoctrineSpeakerRepository.php @@ -489,8 +489,8 @@ SQL; public function getByFullName(string $fullname): ?PresentationSpeaker { $speakerFullNameParts = explode(" ", $fullname); - $speakerFirstName = trim(trim(array_pop($speakerFullNameParts))); - $speakerLastName = trim(implode(" ", $speakerFullNameParts)); + $speakerLastName = trim(trim(array_pop($speakerFullNameParts))); + $speakerFirstName = trim(implode(" ", $speakerFullNameParts)); return $this->getEntityManager() ->createQueryBuilder() diff --git a/app/Repositories/Summit/DoctrineSummitEventRepository.php b/app/Repositories/Summit/DoctrineSummitEventRepository.php index 170236c2..98efce88 100644 --- a/app/Repositories/Summit/DoctrineSummitEventRepository.php +++ b/app/Repositories/Summit/DoctrineSummitEventRepository.php @@ -14,6 +14,7 @@ use App\Models\Foundation\Main\IGroup; use Doctrine\ORM\Tools\Pagination\Paginator; use models\summit\ISummitEventRepository; +use models\summit\Summit; use models\summit\SummitEvent; use App\Repositories\SilverStripeDoctrineRepository; use utils\DoctrineCaseFilterMapping; @@ -351,4 +352,23 @@ final class DoctrineSummitEventRepository $data ); } + + /** + * @param Summit $summit, + * @param array $external_ids + * @return mixed + */ + public function getPublishedEventsBySummitNotInExternalIds(Summit $summit, array $external_ids) + { + $query = $this->getEntityManager()->createQueryBuilder() + ->select("e") + ->from(\models\summit\SummitEvent::class, "e") + ->join('e.summit', 's', Join::WITH, " s.id = :summit_id") + ->where('e.published = 1') + ->andWhere('e.external_id not in (:external_ids)') + ->setParameter('summit_id', $summit->getId()) + ->setParameter('external_ids', $external_ids); + + return $query->getQuery()->getResult(); + } } \ No newline at end of file diff --git a/app/Repositories/Summit/DoctrineSummitRepository.php b/app/Repositories/Summit/DoctrineSummitRepository.php index 0d72d414..ee10e6cb 100644 --- a/app/Repositories/Summit/DoctrineSummitRepository.php +++ b/app/Repositories/Summit/DoctrineSummitRepository.php @@ -175,7 +175,7 @@ final class DoctrineSummitRepository ->andWhere("e.api_feed_url <> ''") ->andWhere("e.api_feed_key is not null") ->andWhere("e.api_feed_key <>''") - ->orderBy('e.begin_date', 'DESC') + ->orderBy('e.id', 'DESC') ->getQuery() ->getResult(); } diff --git a/app/Services/Model/ScheduleIngestionService.php b/app/Services/Model/ScheduleIngestionService.php index 40d5fd6e..65e43ff4 100644 --- a/app/Services/Model/ScheduleIngestionService.php +++ b/app/Services/Model/ScheduleIngestionService.php @@ -105,29 +105,25 @@ final class ScheduleIngestionService */ public function ingestAllSummits(): void { - foreach ($this->summit_repository->getWithExternalFeed() as $summit) { - $processedExternalIds = $this->tx_service->transaction(function () use($summit) { + $summits = $this->tx_service->transaction(function () { + return $this->summit_repository->getWithExternalFeed(); + }); + + foreach ($summits as $summit) { + + $processedExternalIds = $this->ingestSummit($summit); + + $this->tx_service->transaction(function () use ($summit, $processedExternalIds) { + foreach ($this->event_repository->getPublishedEventsBySummitNotInExternalIds($summit, $processedExternalIds) as $presentation) { try { - return $this->ingestSummit($summit); + $this->event_repository->delete($presentation); } catch (Exception $ex) { - Log::error(sprintf("error external feed for summit id %s", $summit->getId())); Log::error($ex); } - }); - - $this->tx_service->transaction(function () use($summit, $processedExternalIds) { - foreach ($summit->getPublishedPresentations() as $presentation) { - try { - if ($presentation instanceof Presentation && !empty($presentation->getExternalId()) && !in_array($presentation->getExternalId(), $processedExternalIds)) - $this->event_repository->delete($presentation); - } - catch (Exception $ex) { - Log::error($ex); - } - } - }); - } + } + }); + } } /** @@ -138,20 +134,22 @@ final class ScheduleIngestionService public function ingestSummit(Summit $summit): array { - return $this->tx_service->transaction(function () use ($summit) { - $processedExternalIds = []; + $processedExternalIds = []; - try { - Log::debug(sprintf("ingesting summit %s", $summit->getId())); - $feed = $this->feed_factory->build($summit); - if (is_null($feed)) - throw new \InvalidArgumentException("invalid feed"); + try { + $start = time(); + $summit_id = $summit->getId(); + Log::debug(sprintf("ScheduleIngestionService::ingestSummit:: ingesting summit %s", $summit->getId())); + $feed = $this->feed_factory->build($summit); + if (is_null($feed)) + throw new \InvalidArgumentException("invalid feed"); + $this->tx_service->transaction(function () use ($summit_id) { + $summit = $this->summit_repository->getById($summit_id); $mainVenues = $summit->getMainVenues(); if (count($mainVenues) == 0) throw new ValidationException(sprintf("summit %s does not has a main venue set!", $summit->getId())); - // get first as default - $mainVenue = $mainVenues[0]; + if (is_null($summit->getBeginDate()) || is_null($summit->getEndDate())) throw new ValidationException(sprintf("summit %s does not has set begin date/end date", $summit->getId())); @@ -159,9 +157,6 @@ final class ScheduleIngestionService if (is_null($summit->getTimeZone())) throw new ValidationException(sprintf("summit %s does not has set a valid time zone", $summit->getId())); - $events = $feed->getEvents(); - $speakers = $feed->getSpeakers(); - // get presentation type from summit $presentationType = $summit->getEventTypeByType(IPresentationType::Presentation); if (is_null($presentationType)) { @@ -174,39 +169,40 @@ final class ScheduleIngestionService $presentationType->setMinModerators(0); $summit->addEventType($presentationType); } + }); - $trackStorage = []; - $locationStorage = []; - $affiliationStorage = []; + $events = $feed->getEvents(); + $speakers = $feed->getSpeakers(); - foreach ($events as $event) { + foreach ($events as $event) { - try { + try { - // track + $external_id = $this->tx_service->transaction(function () use ($summit_id, $event, $speakers) { + Log::debug(sprintf("processing event %s - %s for summit %s", $event['external_id'], $event['title'], $summit_id)); + // get first as default + $summit = $this->summit_repository->getById($summit_id); + if (is_null($summit) || !$summit instanceof Summit) return null; + $mainVenues = $summit->getMainVenues(); + $mainVenue = $mainVenues[0]; + $presentationType = $summit->getEventTypeByType(IPresentationType::Presentation); $track = $summit->getPresentationCategoryByTitle($event['track']); - if (is_null($track) && isset($trackStorage[$event['track']])) - $track = $trackStorage[$event['track']]; if (is_null($track)) { $track = new PresentationCategory(); $track->setTitle($event['track']); $summit->addPresentationCategory($track); - $trackStorage[$event['track']] = $track; } // location $location = null; if (isset($event['location'])) { $location = $summit->getLocationByName($event['location']); - if (is_null($location) && isset($locationStorage[$event['location']])) - $location = $locationStorage[$event['location']]; if (is_null($location)) { $location = new SummitVenueRoom(); $location->setName($event['location']); $mainVenue->addRoom($location); - $locationStorage[$event['location']] = $location; } } @@ -219,10 +215,10 @@ final class ScheduleIngestionService $speakerFirstName = trim(implode(" ", $speakerFullNameParts)); $foundSpeaker = isset($speakers[$speakerFullName]) ? $speakers[$speakerFullName] : null; - if(is_null($foundSpeaker)){ + if (is_null($foundSpeaker)) { // partial match - $result_array = preg_grep("/{$speakerFullName}/i",array_keys($speakers)); - if(count($result_array) > 0){ + $result_array = preg_grep("/{$speakerFullName}/i", array_keys($speakers)); + if (count($result_array) > 0) { $foundSpeaker = $speakers[array_values($result_array)[0]]; } } @@ -245,8 +241,6 @@ final class ScheduleIngestionService // check affiliations if (!empty($companyName)) { $affiliation = $member->getAffiliationByOrgName($companyName); - if (is_null($affiliation) && isset($affiliationStorage[sprintf("%s_%s", $member->getId(), $companyName)])) - $affiliation = $affiliationStorage[sprintf("%s_%s", $member->getId(), $companyName)]; if (is_null($affiliation)) { $affiliation = new Affiliation(); @@ -259,7 +253,6 @@ final class ScheduleIngestionService $affiliation->setOrganization($org); $affiliation->setIsCurrent(true); $member->addAffiliation($affiliation); - $affiliationStorage[sprintf("%s_%s", $member->getId(), $companyName)] = $affiliation; } } @@ -321,18 +314,26 @@ final class ScheduleIngestionService if (!$presentation->isPublished()) $presentation->publish(); - $processedExternalIds[] = $event['external_id']; - } catch (Exception $ex) { - Log::warning(sprintf("error external feed for summit id %s", $summit->getId())); - Log::warning($ex); - } - } - } catch (Exception $ex) { - Log::warning(sprintf("error external feed for summit id %s", $summit->getId())); - Log::warning($ex); - } + return $event['external_id']; + }); + if (!is_null($external_id)) + $processedExternalIds[] = $external_id; + + } catch (Exception $ex) { + Log::warning(sprintf("error external feed for summit id %s", $summit->getId())); + Log::warning($ex); + } + } + $end = time(); + $delta = $end - $start; + log::debug(sprintf("ScheduleIngestionService::ingestSummit execution call %s seconds - summit %s", $delta, $summit->getId())); + + } catch (Exception $ex) { + Log::warning(sprintf("error external feed for summit id %s", $summit->getId())); + Log::warning($ex); + } + + return $processedExternalIds; - return $processedExternalIds; - }); } } \ No newline at end of file diff --git a/app/Services/Utils/DoctrineTransactionService.php b/app/Services/Utils/DoctrineTransactionService.php index 027b15b5..c895b64f 100644 --- a/app/Services/Utils/DoctrineTransactionService.php +++ b/app/Services/Utils/DoctrineTransactionService.php @@ -15,7 +15,8 @@ use Illuminate\Support\Facades\Log; use libs\utils\ITransactionService; use Closure; use LaravelDoctrine\ORM\Facades\Registry; - +use Doctrine\DBAL\Exception\RetryableException; +use Exception; /** * Class DoctrineTransactionService * @package services\utils @@ -27,6 +28,8 @@ final class DoctrineTransactionService implements ITransactionService */ private $manager_name; + const MaxRetries = 3; + /** * DoctrineTransactionService constructor. * @param string $manager_name @@ -36,7 +39,6 @@ final class DoctrineTransactionService implements ITransactionService $this->manager_name = $manager_name; } - /** * Execute a Closure within a transaction. * @@ -47,25 +49,57 @@ final class DoctrineTransactionService implements ITransactionService */ public function transaction(Closure $callback) { - $em = Registry::getManager($this->manager_name); - $con = $em->getConnection(); + $retry = 0; + $done = false; + $result = null; - if (!$em->isOpen()) { - Log::warning("entity manager closed!, trying to re open..."); - $em = $em->create($con->getConnection(), $em->getConfiguration()); - $con = $em->getConnection(); + while (!$done and $retry < self::MaxRetries) { + try { + $em = Registry::getManager($this->manager_name); + $con = $em->getConnection(); + + /** + * Some database systems close the connection after a period of time, in MySQL this is system variable + * `wait_timeout`. Given the daemon is meant to run indefinitely we need to make sure we have an open + * connection before working any job. Otherwise we would see `MySQL has gone away` type errors. + */ + + if ($con->ping() === false) { + Log::warning("DoctrineTransactionService::transaction: conn is closed... reconecting"); + $con->close(); + $con->connect(); + } + + if (!$em->isOpen()) { + Log::warning("DoctrineTransactionService::transaction: entity manager is closed!, trying to re open..."); + $em = Registry::resetManager($this->manager_name); + // new entity manager + $con = $em->getConnection(); + } + + $con->beginTransaction(); // suspend auto-commit + $result = $callback($this); + $em->flush(); + $con->commit(); + $done = true; + } catch (RetryableException $ex) { + Log::warning("retrying ..."); + Registry::resetManager($this->manager_name); + $con->rollBack(); + Log::warning($ex); + $retry++; + if ($retry === self::MaxRetries) { + throw $ex; + } + } catch (Exception $ex) { + Log::warning("rolling back transaction"); + $em->close(); + $con->rollBack(); + Log::error($ex); + throw $ex; + } } - try { - $con->beginTransaction(); // suspend auto-commit - $result = $callback($this); - $em->flush(); - $con->commit(); - } catch (\Exception $e) { - $con->rollBack(); - Log::error($e); - throw $e; - } return $result; } -} \ No newline at end of file +}