revert B: Move to webhook_subscriptions
This commit is contained in:
parent
2759c78061
commit
b5967c8e8c
|
@ -75,7 +75,7 @@ func getAuthSecret(c *gin.Context, body []byte) (string, error) {
|
||||||
return "", fmt.Errorf("There are no events in the delivery. Something is wrong")
|
return "", fmt.Errorf("There are no events in the delivery. Something is wrong")
|
||||||
}
|
}
|
||||||
|
|
||||||
webhook, err := mongo.FindPcoWebhookSubscriptionForUser(*userObjectId, event[0].Name)
|
webhook, err := mongo.FindPcoSubscriptionForUser(*userObjectId, event[0].Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", errors.Join(fmt.Errorf("Failed to find pco subscription for user: %s and event: %s", userObjectId.Hex(), event[0].Name), err)
|
return "", errors.Join(fmt.Errorf("Failed to find pco subscription for user: %s and event: %s", userObjectId.Hex(), event[0].Name), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,36 +41,6 @@ func (api *PcoApiClient) GetSubscriptions() ([]webhooks.Subscription, error) {
|
||||||
return subscriptions, nil
|
return subscriptions, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *PcoApiClient) GetWebhookSubscriptions() ([]webhooks.WebhookSubscription, error) {
|
|
||||||
api.Url().Path = "/webhooks/v2/webhook_subscriptions"
|
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodGet, api.Url().String(), nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := api.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if resp.StatusCode > 299 || resp.StatusCode < 200 {
|
|
||||||
if raw, err := io.ReadAll(resp.Body); err == nil {
|
|
||||||
return nil, fmt.Errorf("Failed to retrieve subscriptions with status code: %d. Error %s", resp.StatusCode, string(raw))
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("Failed to retrieve subscriptions with status code: %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
subscriptions, err := jsonapi.UnmarshalManyPayload[webhooks.WebhookSubscription](resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return subscriptions, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Posts subscriptions to PCO api and returns a new list of subscriptions
|
// Posts subscriptions to PCO api and returns a new list of subscriptions
|
||||||
func (api *PcoApiClient) CreateSubscriptions(subscriptions []webhooks.Subscription) ([]webhooks.Subscription, error) {
|
func (api *PcoApiClient) CreateSubscriptions(subscriptions []webhooks.Subscription) ([]webhooks.Subscription, error) {
|
||||||
api.Url().Path = "/webhooks/v2/subscriptions"
|
api.Url().Path = "/webhooks/v2/subscriptions"
|
||||||
|
@ -144,77 +114,3 @@ func (api *PcoApiClient) CreateSubscription(subscription *webhooks.Subscription)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Posts subscriptions to PCO api and returns a new list of subscriptions
|
|
||||||
func (api *PcoApiClient) CreateWebhookSubscriptions(subscriptions []webhooks.WebhookSubscription) ([]webhooks.WebhookSubscription, error) {
|
|
||||||
api.Url().Path = "/webhooks/v2/webhook_subscriptions"
|
|
||||||
|
|
||||||
body := bytes.NewBuffer([]byte{})
|
|
||||||
err := jsonapi.MarshalPayload(body, subscriptions)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodPost, api.Url().String(), body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.Header.Add("Content-Type", "application/json")
|
|
||||||
|
|
||||||
resp, err := api.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode > 299 || resp.StatusCode < 200 {
|
|
||||||
if raw, err := io.ReadAll(resp.Body); err == nil {
|
|
||||||
return nil, fmt.Errorf("Failed to create subscriptions with status code: %d. Error %s", resp.StatusCode, string(raw))
|
|
||||||
} else {
|
|
||||||
return nil, fmt.Errorf("Failed to create subscriptions with status code: %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
new_subscriptions, err := jsonapi.UnmarshalManyPayload[webhooks.WebhookSubscription](resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return new_subscriptions, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Posts subcription to PCO api and updates the subscription at the pointer that was passed to the fuinction with the server response
|
|
||||||
func (api *PcoApiClient) CreateWebhookSubscription(subscription *webhooks.WebhookSubscription) error {
|
|
||||||
api.Url().Path = "/webhooks/v2/webhook_subscriptions"
|
|
||||||
|
|
||||||
body := bytes.NewBuffer([]byte{})
|
|
||||||
err := jsonapi.MarshalPayload(body, subscription)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequest(http.MethodPost, api.Url().String(), body)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
req.Header.Add("Content-Type", "application/json")
|
|
||||||
|
|
||||||
resp, err := api.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.StatusCode > 299 || resp.StatusCode < 200 {
|
|
||||||
if raw, err := io.ReadAll(resp.Body); err == nil {
|
|
||||||
return fmt.Errorf("Failed to create subscriptions with status code: %d. Error %s", resp.StatusCode, string(raw))
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("Failed to create subscription with status code: %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = jsonapi.UnmarshalPayload(resp.Body, subscription)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
|
@ -17,7 +17,7 @@ type actionFunc func(user *models.User) error
|
||||||
|
|
||||||
var (
|
var (
|
||||||
actionFuncs map[string]actionFunc = map[string]actionFunc{"pco.plan": setupPcoSubscriptions}
|
actionFuncs map[string]actionFunc = map[string]actionFunc{"pco.plan": setupPcoSubscriptions}
|
||||||
webhooksTemplate map[string]webhooks.WebhookSubscription = map[string]webhooks.WebhookSubscription{
|
webhooksTemplate map[string]webhooks.Subscription = map[string]webhooks.Subscription{
|
||||||
"services.v2.events.plan.created": {
|
"services.v2.events.plan.created": {
|
||||||
Active: true,
|
Active: true,
|
||||||
Name: "services.v2.events.plan.created",
|
Name: "services.v2.events.plan.created",
|
||||||
|
@ -114,8 +114,8 @@ func setupPcoSubscriptions(user *models.User) error {
|
||||||
pcoApi := pco.NewClientWithOauthConfig(conf.Vendors[models.PCO_VENDOR_NAME].OauthConfig(), tokenSource)
|
pcoApi := pco.NewClientWithOauthConfig(conf.Vendors[models.PCO_VENDOR_NAME].OauthConfig(), tokenSource)
|
||||||
|
|
||||||
//Check if subscriptions already exist
|
//Check if subscriptions already exist
|
||||||
webhookMap := make(map[string]webhooks.WebhookSubscription)
|
webhookMap := make(map[string]webhooks.Subscription)
|
||||||
subscriptions, err := pcoApi.GetWebhookSubscriptions()
|
subscriptions, err := pcoApi.GetSubscriptions()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Join(fmt.Errorf("Failed to find subscriptions for user: %s", user.Id), err)
|
return errors.Join(fmt.Errorf("Failed to find subscriptions for user: %s", user.Id), err)
|
||||||
}
|
}
|
||||||
|
@ -131,11 +131,11 @@ func setupPcoSubscriptions(user *models.User) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
builtHooks := make([]webhooks.WebhookSubscription, 0, len(webhooksTemplate))
|
builtHooks := make([]webhooks.Subscription, 0, len(webhooksTemplate))
|
||||||
//Build subscriptions
|
//Build subscriptions
|
||||||
for _, templ := range webhooksTemplate {
|
for _, templ := range webhooksTemplate {
|
||||||
if _, ok := webhookMap[templ.Name]; !ok {
|
if _, ok := webhookMap[templ.Name]; !ok {
|
||||||
builtHooks = append(builtHooks, webhooks.WebhookSubscription{
|
builtHooks = append(builtHooks, webhooks.Subscription{
|
||||||
Active: true,
|
Active: true,
|
||||||
Name: templ.Name,
|
Name: templ.Name,
|
||||||
Url: fmt.Sprintf(templ.Url, conf.AppSettings.WebhookServiceUrl, user.Id.Hex()),
|
Url: fmt.Sprintf(templ.Url, conf.AppSettings.WebhookServiceUrl, user.Id.Hex()),
|
||||||
|
@ -145,14 +145,14 @@ func setupPcoSubscriptions(user *models.User) error {
|
||||||
|
|
||||||
//Todo: save subscriptions for succesfull hooksetups
|
//Todo: save subscriptions for succesfull hooksetups
|
||||||
for index := range builtHooks {
|
for index := range builtHooks {
|
||||||
err = pcoApi.CreateWebhookSubscription(&builtHooks[index])
|
err = pcoApi.CreateSubscription(&builtHooks[index])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Join(fmt.Errorf("Failed to create subscription: %s for user: %s", builtHooks[index].Name, user.Id), err)
|
return errors.Join(fmt.Errorf("Failed to create subscription: %s for user: %s", builtHooks[index].Name ,user.Id), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Save Subscriptions
|
//Save Subscriptions
|
||||||
err = mongo.SaveWebhookSubscriptionsForUser(user.Id, builtHooks...)
|
err = mongo.SaveSubscriptionsForUser(user.Id, builtHooks...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Join(fmt.Errorf("Failed to save subscriptions for user: %s", user.Id), err)
|
return errors.Join(fmt.Errorf("Failed to save subscriptions for user: %s", user.Id), err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,29 +34,3 @@ func (obj *PcoSubscription) UpdateObjectInfo() {
|
||||||
}
|
}
|
||||||
obj.UpdatedAt = now
|
obj.UpdatedAt = now
|
||||||
}
|
}
|
||||||
|
|
||||||
type PcoWebhookSubscription struct {
|
|
||||||
*CommonFields `bson:"obj_info"`
|
|
||||||
Id primitive.ObjectID `bson:"_id,omitempty"`
|
|
||||||
UserId primitive.ObjectID `bson:"user_id,omitempty"`
|
|
||||||
Details *webhooks.WebhookSubscription `bson:"details,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func (obj *PcoWebhookSubscription) MongoId() primitive.ObjectID {
|
|
||||||
if obj.Id.IsZero() {
|
|
||||||
now := time.Now()
|
|
||||||
obj.Id = primitive.NewObjectIDFromTimestamp(now)
|
|
||||||
}
|
|
||||||
|
|
||||||
return obj.Id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (obj *PcoWebhookSubscription) UpdateObjectInfo() {
|
|
||||||
now := time.Now()
|
|
||||||
if obj.CommonFields == nil {
|
|
||||||
obj.CommonFields = new(CommonFields)
|
|
||||||
obj.EntityType = PCO_SUBSCRIPTION_TYPE
|
|
||||||
obj.CreatedAt = now
|
|
||||||
}
|
|
||||||
obj.UpdatedAt = now
|
|
||||||
}
|
|
||||||
|
|
35
ui/db/pco.go
35
ui/db/pco.go
|
@ -35,29 +35,6 @@ func (db *DB) FindPcoSubscriptionForUser(userId primitive.ObjectID, eventName st
|
||||||
return subscription, nil
|
return subscription, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// using userId and event string return PCO Subscriptions saved to the DB
|
|
||||||
func (db *DB) FindPcoWebhookSubscriptionForUser(userId primitive.ObjectID, eventName string) (*models.PcoWebhookSubscription, error) {
|
|
||||||
conf := config.Config()
|
|
||||||
|
|
||||||
opts := options.FindOne()
|
|
||||||
res := db.client.Database(conf.Mongo.EntDb).Collection(conf.Mongo.EntCol).FindOne(context.Background(), bson.M{"user_id": userId, "obj_info.ent": models.PCO_SUBSCRIPTION_TYPE, "details.name": eventName}, opts)
|
|
||||||
|
|
||||||
if res.Err() != nil {
|
|
||||||
if res.Err() == mongo.ErrNoDocuments {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return nil, res.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
subscription := &models.PcoWebhookSubscription{}
|
|
||||||
err := res.Decode(subscription)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return subscription, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Okay so learned something here. Interfaces are determined implemented for the type a method is related to.
|
// Okay so learned something here. Interfaces are determined implemented for the type a method is related to.
|
||||||
// This function is not implemented for DB it is implemented for *DB and that is important
|
// This function is not implemented for DB it is implemented for *DB and that is important
|
||||||
func (db *DB) SaveSubscriptionsForUser(userId primitive.ObjectID, subscriptions ...webhooks.Subscription) error {
|
func (db *DB) SaveSubscriptionsForUser(userId primitive.ObjectID, subscriptions ...webhooks.Subscription) error {
|
||||||
|
@ -71,15 +48,3 @@ func (db *DB) SaveSubscriptionsForUser(userId primitive.ObjectID, subscriptions
|
||||||
|
|
||||||
return saveModels(db, mods...)
|
return saveModels(db, mods...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) SaveWebhookSubscriptionsForUser(userId primitive.ObjectID, subscriptions ...webhooks.WebhookSubscription) error {
|
|
||||||
mods := make([]*models.PcoWebhookSubscription, 0, len(subscriptions))
|
|
||||||
for _, sub := range subscriptions {
|
|
||||||
mods = append(mods, &models.PcoWebhookSubscription{
|
|
||||||
UserId: userId,
|
|
||||||
Details: &sub,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return saveModels(db, mods...)
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue