Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 37 additions & 28 deletions api/comms/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,24 @@ func chatCreate(db dbv1.DBTX, ctx context.Context, userId int32, ts time.Time, p
}

for _, blast := range blasts {
messageId := trashid.BlastMessageID(blast.BlastID, params.ChatID)

_, err = db.Exec(ctx, `
insert into chat_message
(message_id, chat_id, user_id, created_at, blast_id)
values
($1, $2, $3, $4, $5)
on conflict do nothing
`, trashid.BlastMessageID(blast.BlastID, params.ChatID), params.ChatID, blast.FromUserID, blast.CreatedAt, blast.BlastID)
`, messageId, params.ChatID, blast.FromUserID, blast.CreatedAt.UTC(), blast.BlastID)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note here the usage of .UTC(). On my machine, when running this, the location() on this time.Time was being set to Local and that was causing us to insert messages in the past. Since chat_blast uses a timestamptz column type and chat_message uses a plain timestamp column type, pgx semantics when inserting will cause it to try and map from the timezone in the incoming variable to the timezone of the machine running postgres.
Kind of gnarly 😬

if err != nil {
return err
}
}

err = chatUpdateLatestFields(db, ctx, params.ChatID)
if err != nil {
return err
}

return err
}
Expand Down Expand Up @@ -317,37 +322,37 @@ func getNewBlasts(tx dbv1.DBTX, ctx context.Context, arg getNewBlastsParams) ([]
// see also: subtly different inverse query exists in chat_blast.go
// to fan out messages to existing chat
var findNewBlasts = `
with
last_permission_change as (
select max(t) as t from (
select updated_at as t from chat_permissions where user_id = $1
union
select created_at as t from chat_blocked_users where blocker_user_id = $1
union
select to_timestamp(0)
) as timestamp_subquery
WITH
last_permission_change AS (
SELECT max(t) AS t FROM (
SELECT updated_at AS t FROM chat_permissions WHERE user_id = @user_id
UNION
SELECT created_at AS t FROM chat_blocked_users WHERE blocker_user_id = @user_id
UNION
SELECT to_timestamp(0)
) AS timestamp_subquery
),
all_new as (
select *
from chat_blast blast
where
from_user_id in (
all_new AS (
SELECT *
FROM chat_blast blast
WHERE
from_user_id IN (
-- follower_audience
SELECT followee_user_id AS from_user_id
FROM follows
WHERE blast.audience = 'follower_audience'
AND follows.followee_user_id = blast.from_user_id
AND follows.follower_user_id = $1
AND follows.follower_user_id = @user_id
AND follows.is_delete = false
AND follows.created_at < blast.created_at
)
OR from_user_id in (
OR from_user_id IN (
-- tipper_audience
SELECT receiver_user_id
FROM user_tips tip
WHERE blast.audience = 'tipper_audience'
AND receiver_user_id = blast.from_user_id
AND sender_user_id = $1
AND sender_user_id = @user_id
AND tip.created_at < blast.created_at
)
OR from_user_id IN (
Expand All @@ -358,29 +363,31 @@ func getNewBlasts(tx dbv1.DBTX, ctx context.Context, arg getNewBlastsParams) ([]
JOIN tracks og ON remixes.parent_track_id = og.track_id
WHERE blast.audience = 'remixer_audience'
AND og.owner_id = blast.from_user_id
AND t.owner_id = $1
AND t.owner_id = @user_id
AND (
blast.audience_content_id IS NULL
OR (
blast.audience_content_type = 'track'
AND blast.audience_content_id = og.track_id
)
)
AND t.created_at < blast.created_at
)
OR from_user_id IN (
-- customer_audience
SELECT seller_user_id
FROM usdc_purchases p
WHERE blast.audience = 'customer_audience'
AND p.seller_user_id = blast.from_user_id
AND p.buyer_user_id = $1
AND p.buyer_user_id = @user_id
AND (
audience_content_id IS NULL
OR (
blast.audience_content_type = p.content_type::text
AND blast.audience_content_id = p.content_id
)
)
AND p.created_at < blast.created_at
)
OR from_user_id IN (
-- coin_holder_audience via sol_user_balances
Expand All @@ -389,19 +396,21 @@ func getNewBlasts(tx dbv1.DBTX, ctx context.Context, arg getNewBlastsParams) ([]
JOIN sol_user_balances sub ON sub.mint = ac.mint
WHERE blast.audience = 'coin_holder_audience'
AND ac.user_id = blast.from_user_id
AND sub.user_id = $1
AND sub.user_id = @user_id
AND sub.balance > 0
-- TODO: PE-6663 This isn't entirely correct yet, need to check "time of most recent membership"
AND sub.created_at < blast.created_at
)
)
select * from all_new
where created_at > (select t from last_permission_change)
and chat_allowed(from_user_id, $1)
order by created_at
`

rows, err := tx.Query(ctx, findNewBlasts, arg.UserID)
SELECT * FROM all_new
WHERE created_at > (select t from last_permission_change)
AND chat_allowed(from_user_id, @user_id)
ORDER BY created_at
;`

rows, err := tx.Query(ctx, findNewBlasts, pgx.NamedArgs{
"user_id": arg.UserID,
})
if err != nil {
return nil, err
}
Expand Down
9 changes: 2 additions & 7 deletions api/comms/chat_blast.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (

// Result struct to hold chat_id and to_user_id
type ChatBlastResult struct {
ChatID string `db:"chat_id"`
ToUserID int32 `db:"to_user_id"`
ChatID string `db:"chat_id"`
}

type OutgoingChatMessage struct {
Expand Down Expand Up @@ -80,11 +79,7 @@ func chatBlast(db dbv1.DBTX, ctx context.Context, userId int32, ts time.Time, pa
defer rows.Close()

// Scan the results into the results slice
results, err = pgx.CollectRows(rows, func(row pgx.CollectableRow) (ChatBlastResult, error) {
var result ChatBlastResult
err := row.Scan(&result.ChatID, &result.ToUserID)
return result, err
})
results, err = pgx.CollectRows(rows, pgx.RowToStructByName[ChatBlastResult])
if err != nil {
return nil, err
}
Expand Down
Loading
Loading